2 Unix SMB/CIFS implementation.
3 Samba internal messaging functions
4 Copyright (C) Andrew Tridgell 2000
5 Copyright (C) 2001 by Martin Pool
6 Copyright (C) 2002 by Jeremy Allison
7 Copyright (C) 2007 by Volker Lendecke
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation; either version 3 of the License, or
12 (at your option) any later version.
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
19 You should have received a copy of the GNU General Public License
20 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 @defgroup messages Internal messaging framework
28 @brief Module for internal messaging between Samba daemons.
30 The idea is that if a part of Samba wants to do communication with
31 another Samba process then it will do a message_register() of a
32 dispatch function, and use message_send_pid() to send messages to
35 The dispatch function is given the pid of the sender, and it can
36 use that to reply by message_send_pid(). See ping_message() for a
39 @caution Dispatch functions must be able to cope with incoming
40 messages on an *odd* byte boundary.
42 This system doesn't have any inherent size limitations but is not
43 very efficient for large messages or when messages are sent in very
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
62 struct messaging_callback {
63 struct messaging_callback *prev, *next;
65 void (*fn)(struct messaging_context *msg, void *private_data,
67 struct server_id server_id, DATA_BLOB *data);
71 struct messaging_context {
73 struct tevent_context *event_ctx;
74 struct messaging_callback *callbacks;
76 struct tevent_req **new_waiters;
77 size_t num_new_waiters;
79 struct tevent_req **waiters;
83 struct messaging_backend *remote;
85 struct server_id_db *names_db;
88 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
89 struct messaging_rec *rec);
90 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
91 struct tevent_context *ev,
92 struct messaging_rec *rec);
94 /****************************************************************************
95 A useful function for testing the message system.
96 ****************************************************************************/
98 static void ping_message(struct messaging_context *msg_ctx,
101 struct server_id src,
104 struct server_id_buf idbuf;
106 DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
107 server_id_str_buf(src, &idbuf), (int)data->length,
108 data->data ? (char *)data->data : ""));
110 messaging_send(msg_ctx, src, MSG_PONG, data);
113 struct messaging_rec *messaging_rec_create(
114 TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
115 uint32_t msg_type, const struct iovec *iov, int iovlen,
116 const int *fds, size_t num_fds)
120 struct messaging_rec *result;
122 if (num_fds > INT8_MAX) {
126 buflen = iov_buflen(iov, iovlen);
130 buf = talloc_array(mem_ctx, uint8_t, buflen);
134 iov_buf(iov, iovlen, buf, buflen);
137 struct messaging_rec rec;
138 int64_t fds64[num_fds];
141 for (i=0; i<num_fds; i++) {
145 rec = (struct messaging_rec) {
146 .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
147 .src = src, .dest = dst,
148 .buf.data = buf, .buf.length = buflen,
149 .num_fds = num_fds, .fds = fds64,
152 result = messaging_rec_dup(mem_ctx, &rec);
160 static void messaging_recv_cb(struct tevent_context *ev,
161 const uint8_t *msg, size_t msg_len,
162 int *fds, size_t num_fds,
165 struct messaging_context *msg_ctx = talloc_get_type_abort(
166 private_data, struct messaging_context);
167 struct server_id_buf idbuf;
168 struct messaging_rec rec;
169 int64_t fds64[MIN(num_fds, INT8_MAX)];
172 if (msg_len < MESSAGE_HDR_LENGTH) {
173 DBG_WARNING("message too short: %zu\n", msg_len);
177 if (num_fds > INT8_MAX) {
178 DBG_WARNING("too many fds: %zu\n", num_fds);
183 * "consume" the fds by copying them and setting
184 * the original variable to -1
186 for (i=0; i < num_fds; i++) {
191 rec = (struct messaging_rec) {
192 .msg_version = MESSAGE_VERSION,
193 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
194 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
199 message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
201 DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
202 (unsigned)rec.msg_type, rec.buf.length, num_fds,
203 server_id_str_buf(rec.src, &idbuf));
205 messaging_dispatch_rec(msg_ctx, ev, &rec);
209 for (i=0; i < num_fds; i++) {
214 static int messaging_context_destructor(struct messaging_context *ctx)
218 for (i=0; i<ctx->num_new_waiters; i++) {
219 if (ctx->new_waiters[i] != NULL) {
220 tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
221 ctx->new_waiters[i] = NULL;
224 for (i=0; i<ctx->num_waiters; i++) {
225 if (ctx->waiters[i] != NULL) {
226 tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
227 ctx->waiters[i] = NULL;
234 static const char *private_path(const char *name)
236 return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
239 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
240 struct tevent_context *ev,
241 struct messaging_context **pmsg_ctx)
244 struct messaging_context *ctx;
245 NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
247 const char *lck_path;
248 const char *priv_path;
251 lck_path = lock_path("msg.lock");
252 if (lck_path == NULL) {
253 return NT_STATUS_NO_MEMORY;
256 ok = directory_create_or_exist_strict(lck_path,
260 DBG_DEBUG("Could not create lock directory: %s\n",
262 return NT_STATUS_ACCESS_DENIED;
265 priv_path = private_path("msg.sock");
266 if (priv_path == NULL) {
267 return NT_STATUS_NO_MEMORY;
270 ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
273 DBG_DEBUG("Could not create msg directory: %s\n",
275 return NT_STATUS_ACCESS_DENIED;
278 frame = talloc_stackframe();
280 return NT_STATUS_NO_MEMORY;
283 ctx = talloc_zero(frame, struct messaging_context);
285 status = NT_STATUS_NO_MEMORY;
289 ctx->id = (struct server_id) {
290 .pid = getpid(), .vnn = NONCLUSTER_VNN
297 ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
305 if (ctx->msg_dgm_ref == NULL) {
306 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
307 status = map_nt_error_from_unix(ret);
310 talloc_set_destructor(ctx, messaging_context_destructor);
312 if (lp_clustering()) {
313 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
316 DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
318 status = map_nt_error_from_unix(ret);
322 ctx->id.vnn = get_my_vnn();
324 ctx->names_db = server_id_db_init(ctx,
328 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
329 if (ctx->names_db == NULL) {
330 DBG_DEBUG("server_id_db_init failed\n");
331 status = NT_STATUS_NO_MEMORY;
335 messaging_register(ctx, NULL, MSG_PING, ping_message);
337 /* Register some debugging related messages */
339 register_msg_pool_usage(ctx);
340 register_dmalloc_msgs(ctx);
341 debug_register_msgs(ctx);
344 struct server_id_buf tmp;
345 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
348 *pmsg_ctx = talloc_steal(mem_ctx, ctx);
350 status = NT_STATUS_OK;
357 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
358 struct tevent_context *ev)
360 struct messaging_context *ctx = NULL;
363 status = messaging_init_internal(mem_ctx,
366 if (!NT_STATUS_IS_OK(status)) {
373 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
374 struct tevent_context *ev,
375 struct messaging_context **pmsg_ctx)
377 return messaging_init_internal(mem_ctx,
382 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
388 * re-init after a fork
390 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
395 TALLOC_FREE(msg_ctx->msg_dgm_ref);
397 msg_ctx->id = (struct server_id) {
398 .pid = getpid(), .vnn = msg_ctx->id.vnn
401 lck_path = lock_path("msg.lock");
402 if (lck_path == NULL) {
403 return NT_STATUS_NO_MEMORY;
406 msg_ctx->msg_dgm_ref = messaging_dgm_ref(
407 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
408 private_path("msg.sock"), lck_path,
409 messaging_recv_cb, msg_ctx, &ret);
411 if (msg_ctx->msg_dgm_ref == NULL) {
412 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
413 return map_nt_error_from_unix(ret);
416 if (lp_clustering()) {
417 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
421 DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
423 return map_nt_error_from_unix(ret);
427 server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
434 * Register a dispatch function for a particular message type. Allow multiple
437 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
440 void (*fn)(struct messaging_context *msg,
443 struct server_id server_id,
446 struct messaging_callback *cb;
448 DEBUG(5, ("Registering messaging pointer for type %u - "
450 (unsigned)msg_type, private_data));
453 * Only one callback per type
456 for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
457 /* we allow a second registration of the same message
458 type if it has a different private pointer. This is
459 needed in, for example, the internal notify code,
460 which creates a new notify context for each tree
461 connect, and expects to receive messages to each of
463 if (cb->msg_type == msg_type && private_data == cb->private_data) {
464 DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
465 (unsigned)msg_type, private_data));
467 cb->private_data = private_data;
472 if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
473 return NT_STATUS_NO_MEMORY;
476 cb->msg_type = msg_type;
478 cb->private_data = private_data;
480 DLIST_ADD(msg_ctx->callbacks, cb);
485 De-register the function for a particular message type.
487 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
490 struct messaging_callback *cb, *next;
492 for (cb = ctx->callbacks; cb; cb = next) {
494 if ((cb->msg_type == msg_type)
495 && (cb->private_data == private_data)) {
496 DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
497 (unsigned)msg_type, private_data));
498 DLIST_REMOVE(ctx->callbacks, cb);
505 Send a message to a particular server
507 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
508 struct server_id server, uint32_t msg_type,
509 const DATA_BLOB *data)
511 struct iovec iov = {0};
514 iov.iov_base = data->data;
515 iov.iov_len = data->length;
518 return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
521 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
522 struct server_id server, uint32_t msg_type,
523 const uint8_t *buf, size_t len)
525 DATA_BLOB blob = data_blob_const(buf, len);
526 return messaging_send(msg_ctx, server, msg_type, &blob);
529 struct messaging_post_state {
530 struct messaging_context *msg_ctx;
531 struct messaging_rec *rec;
534 static void messaging_post_handler(struct tevent_context *ev,
535 struct tevent_immediate *ti,
538 static int messaging_post_self(struct messaging_context *msg_ctx,
539 struct server_id src, struct server_id dst,
541 const struct iovec *iov, int iovlen,
542 const int *fds, size_t num_fds)
544 struct tevent_immediate *ti;
545 struct messaging_post_state *state;
547 state = talloc(msg_ctx, struct messaging_post_state);
551 state->msg_ctx = msg_ctx;
553 ti = tevent_create_immediate(state);
557 state->rec = messaging_rec_create(
558 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
559 if (state->rec == NULL) {
563 tevent_schedule_immediate(ti, msg_ctx->event_ctx,
564 messaging_post_handler, state);
572 static void messaging_post_handler(struct tevent_context *ev,
573 struct tevent_immediate *ti,
576 struct messaging_post_state *state = talloc_get_type_abort(
577 private_data, struct messaging_post_state);
578 messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
582 int messaging_send_iov_from(struct messaging_context *msg_ctx,
583 struct server_id src, struct server_id dst,
585 const struct iovec *iov, int iovlen,
586 const int *fds, size_t num_fds)
589 uint8_t hdr[MESSAGE_HDR_LENGTH];
590 struct iovec iov2[iovlen+1];
592 if (server_id_is_disconnected(&dst)) {
596 if (num_fds > INT8_MAX) {
600 if (dst.vnn != msg_ctx->id.vnn) {
605 ret = msg_ctx->remote->send_fn(src, dst,
606 msg_type, iov, iovlen,
612 if (server_id_equal(&dst, &msg_ctx->id)) {
613 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
614 iov, iovlen, fds, num_fds);
618 message_hdr_put(hdr, msg_type, src, dst);
619 iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
620 memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
622 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
626 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
631 if (ret == ECONNREFUSED) {
633 * Linux returns this when a socket exists in the file
634 * system without a listening process. This is not
635 * documented in susv4 or the linux manpages, but it's
636 * easily testable. For the higher levels this is the
637 * same as "destination does not exist"
645 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
646 struct server_id dst, uint32_t msg_type,
647 const struct iovec *iov, int iovlen,
648 const int *fds, size_t num_fds)
652 ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
653 iov, iovlen, fds, num_fds);
655 return map_nt_error_from_unix(ret);
660 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
661 struct messaging_rec *rec)
663 struct messaging_rec *result;
664 size_t fds_size = sizeof(int64_t) * rec->num_fds;
667 payload_len = rec->buf.length + fds_size;
668 if (payload_len < rec->buf.length) {
673 result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
675 if (result == NULL) {
680 /* Doesn't fail, see talloc_pooled_object */
682 result->buf.data = talloc_memdup(result, rec->buf.data,
686 if (result->num_fds > 0) {
687 result->fds = talloc_memdup(result, rec->fds, fds_size);
693 struct messaging_filtered_read_state {
694 struct tevent_context *ev;
695 struct messaging_context *msg_ctx;
696 struct messaging_dgm_fde *fde;
698 bool (*filter)(struct messaging_rec *rec, void *private_data);
701 struct messaging_rec *rec;
704 static void messaging_filtered_read_cleanup(struct tevent_req *req,
705 enum tevent_req_state req_state);
707 struct tevent_req *messaging_filtered_read_send(
708 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
709 struct messaging_context *msg_ctx,
710 bool (*filter)(struct messaging_rec *rec, void *private_data),
713 struct tevent_req *req;
714 struct messaging_filtered_read_state *state;
715 size_t new_waiters_len;
717 req = tevent_req_create(mem_ctx, &state,
718 struct messaging_filtered_read_state);
723 state->msg_ctx = msg_ctx;
724 state->filter = filter;
725 state->private_data = private_data;
728 * We have to defer the callback here, as we might be called from
729 * within a different tevent_context than state->ev
731 tevent_req_defer_callback(req, state->ev);
733 state->fde = messaging_dgm_register_tevent_context(state, ev);
734 if (tevent_req_nomem(state->fde, req)) {
735 return tevent_req_post(req, ev);
739 * We add ourselves to the "new_waiters" array, not the "waiters"
740 * array. If we are called from within messaging_read_done,
741 * messaging_dispatch_rec will be in an active for-loop on
742 * "waiters". We must be careful not to mess with this array, because
743 * it could mean that a single event is being delivered twice.
746 new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
748 if (new_waiters_len == msg_ctx->num_new_waiters) {
749 struct tevent_req **tmp;
751 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
752 struct tevent_req *, new_waiters_len+1);
753 if (tevent_req_nomem(tmp, req)) {
754 return tevent_req_post(req, ev);
756 msg_ctx->new_waiters = tmp;
759 msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
760 msg_ctx->num_new_waiters += 1;
761 tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
766 static void messaging_filtered_read_cleanup(struct tevent_req *req,
767 enum tevent_req_state req_state)
769 struct messaging_filtered_read_state *state = tevent_req_data(
770 req, struct messaging_filtered_read_state);
771 struct messaging_context *msg_ctx = state->msg_ctx;
774 tevent_req_set_cleanup_fn(req, NULL);
776 TALLOC_FREE(state->fde);
779 * Just set the [new_]waiters entry to NULL, be careful not to mess
780 * with the other "waiters" array contents. We are often called from
781 * within "messaging_dispatch_rec", which loops over
782 * "waiters". Messing with the "waiters" array will mess up that
786 for (i=0; i<msg_ctx->num_waiters; i++) {
787 if (msg_ctx->waiters[i] == req) {
788 msg_ctx->waiters[i] = NULL;
793 for (i=0; i<msg_ctx->num_new_waiters; i++) {
794 if (msg_ctx->new_waiters[i] == req) {
795 msg_ctx->new_waiters[i] = NULL;
801 static void messaging_filtered_read_done(struct tevent_req *req,
802 struct messaging_rec *rec)
804 struct messaging_filtered_read_state *state = tevent_req_data(
805 req, struct messaging_filtered_read_state);
807 state->rec = messaging_rec_dup(state, rec);
808 if (tevent_req_nomem(state->rec, req)) {
811 tevent_req_done(req);
814 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
815 struct messaging_rec **presult)
817 struct messaging_filtered_read_state *state = tevent_req_data(
818 req, struct messaging_filtered_read_state);
821 if (tevent_req_is_unix_error(req, &err)) {
822 tevent_req_received(req);
825 if (presult != NULL) {
826 *presult = talloc_move(mem_ctx, &state->rec);
831 struct messaging_read_state {
833 struct messaging_rec *rec;
836 static bool messaging_read_filter(struct messaging_rec *rec,
838 static void messaging_read_done(struct tevent_req *subreq);
840 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
841 struct tevent_context *ev,
842 struct messaging_context *msg,
845 struct tevent_req *req, *subreq;
846 struct messaging_read_state *state;
848 req = tevent_req_create(mem_ctx, &state,
849 struct messaging_read_state);
853 state->msg_type = msg_type;
855 subreq = messaging_filtered_read_send(state, ev, msg,
856 messaging_read_filter, state);
857 if (tevent_req_nomem(subreq, req)) {
858 return tevent_req_post(req, ev);
860 tevent_req_set_callback(subreq, messaging_read_done, req);
864 static bool messaging_read_filter(struct messaging_rec *rec,
867 struct messaging_read_state *state = talloc_get_type_abort(
868 private_data, struct messaging_read_state);
870 if (rec->num_fds != 0) {
874 return rec->msg_type == state->msg_type;
877 static void messaging_read_done(struct tevent_req *subreq)
879 struct tevent_req *req = tevent_req_callback_data(
880 subreq, struct tevent_req);
881 struct messaging_read_state *state = tevent_req_data(
882 req, struct messaging_read_state);
885 ret = messaging_filtered_read_recv(subreq, state, &state->rec);
887 if (tevent_req_error(req, ret)) {
890 tevent_req_done(req);
893 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
894 struct messaging_rec **presult)
896 struct messaging_read_state *state = tevent_req_data(
897 req, struct messaging_read_state);
900 if (tevent_req_is_unix_error(req, &err)) {
903 if (presult != NULL) {
904 *presult = talloc_move(mem_ctx, &state->rec);
909 struct messaging_handler_state {
910 struct tevent_context *ev;
911 struct messaging_context *msg_ctx;
913 bool (*handler)(struct messaging_context *msg_ctx,
914 struct messaging_rec **rec, void *private_data);
918 static void messaging_handler_got_msg(struct tevent_req *subreq);
920 struct tevent_req *messaging_handler_send(
921 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
922 struct messaging_context *msg_ctx, uint32_t msg_type,
923 bool (*handler)(struct messaging_context *msg_ctx,
924 struct messaging_rec **rec, void *private_data),
927 struct tevent_req *req, *subreq;
928 struct messaging_handler_state *state;
930 req = tevent_req_create(mem_ctx, &state,
931 struct messaging_handler_state);
936 state->msg_ctx = msg_ctx;
937 state->msg_type = msg_type;
938 state->handler = handler;
939 state->private_data = private_data;
941 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
943 if (tevent_req_nomem(subreq, req)) {
944 return tevent_req_post(req, ev);
946 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
950 static void messaging_handler_got_msg(struct tevent_req *subreq)
952 struct tevent_req *req = tevent_req_callback_data(
953 subreq, struct tevent_req);
954 struct messaging_handler_state *state = tevent_req_data(
955 req, struct messaging_handler_state);
956 struct messaging_rec *rec;
960 ret = messaging_read_recv(subreq, state, &rec);
962 if (tevent_req_error(req, ret)) {
966 subreq = messaging_read_send(state, state->ev, state->msg_ctx,
968 if (tevent_req_nomem(subreq, req)) {
971 tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
973 ok = state->handler(state->msg_ctx, &rec, state->private_data);
982 tevent_req_done(req);
985 int messaging_handler_recv(struct tevent_req *req)
987 return tevent_req_simple_recv_unix(req);
990 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
992 if (msg_ctx->num_new_waiters == 0) {
996 if (talloc_array_length(msg_ctx->waiters) <
997 (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
998 struct tevent_req **tmp;
999 tmp = talloc_realloc(
1000 msg_ctx, msg_ctx->waiters, struct tevent_req *,
1001 msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1003 DEBUG(1, ("%s: talloc failed\n", __func__));
1006 msg_ctx->waiters = tmp;
1009 memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1010 sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1012 msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1013 msg_ctx->num_new_waiters = 0;
1018 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1019 struct messaging_rec *rec)
1021 struct messaging_callback *cb, *next;
1023 for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1027 if (cb->msg_type != rec->msg_type) {
1032 * the old style callbacks don't support fd passing
1034 for (j=0; j < rec->num_fds; j++) {
1035 int fd = rec->fds[j];
1041 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1042 rec->src, &rec->buf);
1051 Dispatch one messaging_rec
1053 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1054 struct tevent_context *ev,
1055 struct messaging_rec *rec)
1060 if (ev == msg_ctx->event_ctx) {
1061 consumed = messaging_dispatch_classic(msg_ctx, rec);
1067 if (!messaging_append_new_waiters(msg_ctx)) {
1069 for (j=0; j < rec->num_fds; j++) {
1070 int fd = rec->fds[j];
1079 while (i < msg_ctx->num_waiters) {
1080 struct tevent_req *req;
1081 struct messaging_filtered_read_state *state;
1083 req = msg_ctx->waiters[i];
1086 * This got cleaned up. In the meantime,
1087 * move everything down one. We need
1088 * to keep the order of waiters, as
1089 * other code may depend on this.
1091 if (i < msg_ctx->num_waiters - 1) {
1092 memmove(&msg_ctx->waiters[i],
1093 &msg_ctx->waiters[i+1],
1094 sizeof(struct tevent_req *) *
1095 (msg_ctx->num_waiters - i - 1));
1097 msg_ctx->num_waiters -= 1;
1101 state = tevent_req_data(
1102 req, struct messaging_filtered_read_state);
1103 if ((ev == state->ev) &&
1104 state->filter(rec, state->private_data)) {
1105 messaging_filtered_read_done(req, rec);
1112 if (ev != msg_ctx->event_ctx) {
1114 int fds[rec->num_fds];
1118 * We've been listening on a nested event
1119 * context. Messages need to be handled in the main
1120 * event context, so post to ourselves
1123 iov.iov_base = rec->buf.data;
1124 iov.iov_len = rec->buf.length;
1126 for (i=0; i<rec->num_fds; i++) {
1127 fds[i] = rec->fds[i];
1130 ret = messaging_post_self(
1131 msg_ctx, rec->src, rec->dest, rec->msg_type,
1132 &iov, 1, fds, rec->num_fds);
1139 * If the fd-array isn't used, just close it.
1141 for (i=0; i < rec->num_fds; i++) {
1142 int fd = rec->fds[i];
1149 static int mess_parent_dgm_cleanup(void *private_data);
1150 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1152 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1154 struct tevent_req *req;
1156 req = background_job_send(
1157 msg, msg->event_ctx, msg, NULL, 0,
1158 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1160 mess_parent_dgm_cleanup, msg);
1164 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1168 static int mess_parent_dgm_cleanup(void *private_data)
1172 ret = messaging_dgm_wipe();
1173 DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1174 ret ? strerror(ret) : "ok"));
1175 return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1179 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1181 struct messaging_context *msg = tevent_req_callback_data(
1182 req, struct messaging_context);
1185 status = background_job_recv(req);
1187 DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1188 nt_errstr(status)));
1190 req = background_job_send(
1191 msg, msg->event_ctx, msg, NULL, 0,
1192 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1194 mess_parent_dgm_cleanup, msg);
1196 DEBUG(1, ("background_job_send failed\n"));
1199 tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1202 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1207 ret = messaging_dgm_wipe();
1209 ret = messaging_dgm_cleanup(pid);
1215 struct tevent_context *messaging_tevent_context(
1216 struct messaging_context *msg_ctx)
1218 return msg_ctx->event_ctx;
1221 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1223 return msg_ctx->names_db;