1263bf1698c15c07b010d35b0e82acf6d854fd82
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 struct messaging_context {
65         struct server_id id;
66         struct tevent_context *event_ctx;
67         struct messaging_callback *callbacks;
68
69         struct tevent_req **new_waiters;
70         unsigned num_new_waiters;
71
72         struct tevent_req **waiters;
73         unsigned num_waiters;
74
75         struct messaging_backend *local;
76         struct messaging_backend *remote;
77
78         bool *have_context;
79 };
80
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
82
83 /****************************************************************************
84  A useful function for testing the message system.
85 ****************************************************************************/
86
87 static void ping_message(struct messaging_context *msg_ctx,
88                          void *private_data,
89                          uint32_t msg_type,
90                          struct server_id src,
91                          DATA_BLOB *data)
92 {
93         const char *msg = "none";
94         char *free_me = NULL;
95
96         if (data->data != NULL) {
97                 free_me = talloc_strndup(talloc_tos(), (char *)data->data,
98                                          data->length);
99                 msg = free_me;
100         }
101         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
102                  procid_str_static(&src), msg));
103         TALLOC_FREE(free_me);
104         messaging_send(msg_ctx, src, MSG_PONG, data);
105 }
106
107 /****************************************************************************
108  Register/replace a dispatch function for a particular message type.
109  JRA changed Dec 13 2006. Only one message handler now permitted per type.
110  *NOTE*: Dispatch functions must be able to cope with incoming
111  messages on an *odd* byte boundary.
112 ****************************************************************************/
113
114 struct msg_all {
115         struct messaging_context *msg_ctx;
116         int msg_type;
117         uint32 msg_flag;
118         const void *buf;
119         size_t len;
120         int n_sent;
121 };
122
123 /****************************************************************************
124  Send one of the messages for the broadcast.
125 ****************************************************************************/
126
127 static int traverse_fn(struct db_record *rec, const struct server_id *id,
128                        uint32_t msg_flags, void *state)
129 {
130         struct msg_all *msg_all = (struct msg_all *)state;
131         NTSTATUS status;
132
133         /* Don't send if the receiver hasn't registered an interest. */
134
135         if((msg_flags & msg_all->msg_flag) == 0) {
136                 return 0;
137         }
138
139         /* If the msg send fails because the pid was not found (i.e. smbd died), 
140          * the msg has already been deleted from the messages.tdb.*/
141
142         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
143                                     (const uint8_t *)msg_all->buf, msg_all->len);
144
145         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
146
147                 /*
148                  * If the pid was not found delete the entry from
149                  * serverid.tdb
150                  */
151
152                 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
153
154                 dbwrap_record_delete(rec);
155         }
156         msg_all->n_sent++;
157         return 0;
158 }
159
160 /**
161  * Send a message to all smbd processes.
162  *
163  * It isn't very efficient, but should be OK for the sorts of
164  * applications that use it. When we need efficient broadcast we can add
165  * it.
166  *
167  * @param n_sent Set to the number of messages sent.  This should be
168  * equal to the number of processes, but be careful for races.
169  *
170  * @retval True for success.
171  **/
172 bool message_send_all(struct messaging_context *msg_ctx,
173                       int msg_type,
174                       const void *buf, size_t len,
175                       int *n_sent)
176 {
177         struct msg_all msg_all;
178
179         msg_all.msg_type = msg_type;
180         if (msg_type < 0x100) {
181                 msg_all.msg_flag = FLAG_MSG_GENERAL;
182         } else if (msg_type > 0x100 && msg_type < 0x200) {
183                 msg_all.msg_flag = FLAG_MSG_NMBD;
184         } else if (msg_type > 0x200 && msg_type < 0x300) {
185                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
186         } else if (msg_type > 0x300 && msg_type < 0x400) {
187                 msg_all.msg_flag = FLAG_MSG_SMBD;
188         } else if (msg_type > 0x400 && msg_type < 0x600) {
189                 msg_all.msg_flag = FLAG_MSG_WINBIND;
190         } else if (msg_type > 4000 && msg_type < 5000) {
191                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
192         } else {
193                 return false;
194         }
195
196         msg_all.buf = buf;
197         msg_all.len = len;
198         msg_all.n_sent = 0;
199         msg_all.msg_ctx = msg_ctx;
200
201         serverid_traverse(traverse_fn, &msg_all);
202         if (n_sent)
203                 *n_sent = msg_all.n_sent;
204         return true;
205 }
206
207 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
208                                          struct tevent_context *ev)
209 {
210         struct messaging_context *ctx;
211         NTSTATUS status;
212         static bool have_context = false;
213
214         if (have_context) {
215                 DEBUG(0, ("No two messaging contexts per process\n"));
216                 return NULL;
217         }
218
219
220         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
221                 return NULL;
222         }
223
224         ctx->id = procid_self();
225         ctx->event_ctx = ev;
226         ctx->have_context = &have_context;
227
228         status = messaging_dgm_init(ctx, ctx, &ctx->local);
229
230         if (!NT_STATUS_IS_OK(status)) {
231                 DEBUG(2, ("messaging_dgm_init failed: %s\n",
232                           nt_errstr(status)));
233                 TALLOC_FREE(ctx);
234                 return NULL;
235         }
236
237         if (lp_clustering()) {
238                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
239
240                 if (!NT_STATUS_IS_OK(status)) {
241                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
242                                   nt_errstr(status)));
243                         TALLOC_FREE(ctx);
244                         return NULL;
245                 }
246         }
247         ctx->id.vnn = get_my_vnn();
248
249         messaging_register(ctx, NULL, MSG_PING, ping_message);
250
251         /* Register some debugging related messages */
252
253         register_msg_pool_usage(ctx);
254         register_dmalloc_msgs(ctx);
255         debug_register_msgs(ctx);
256
257         have_context = true;
258         talloc_set_destructor(ctx, messaging_context_destructor);
259
260         return ctx;
261 }
262
263 static int messaging_context_destructor(struct messaging_context *msg_ctx)
264 {
265         SMB_ASSERT(*msg_ctx->have_context);
266         *msg_ctx->have_context = false;
267         return 0;
268 }
269
270 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
271 {
272         return msg_ctx->id;
273 }
274
275 /*
276  * re-init after a fork
277  */
278 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
279 {
280         NTSTATUS status;
281
282         TALLOC_FREE(msg_ctx->local);
283
284         msg_ctx->id = procid_self();
285
286         status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
287         if (!NT_STATUS_IS_OK(status)) {
288                 DEBUG(0, ("messaging_dgm_init failed: %s\n",
289                           nt_errstr(status)));
290                 return status;
291         }
292
293         TALLOC_FREE(msg_ctx->remote);
294
295         if (lp_clustering()) {
296                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
297                                               &msg_ctx->remote);
298
299                 if (!NT_STATUS_IS_OK(status)) {
300                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
301                                   nt_errstr(status)));
302                         return status;
303                 }
304         }
305
306         return NT_STATUS_OK;
307 }
308
309
310 /*
311  * Register a dispatch function for a particular message type. Allow multiple
312  * registrants
313 */
314 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
315                             void *private_data,
316                             uint32_t msg_type,
317                             void (*fn)(struct messaging_context *msg,
318                                        void *private_data, 
319                                        uint32_t msg_type, 
320                                        struct server_id server_id,
321                                        DATA_BLOB *data))
322 {
323         struct messaging_callback *cb;
324
325         DEBUG(5, ("Registering messaging pointer for type %u - "
326                   "private_data=%p\n",
327                   (unsigned)msg_type, private_data));
328
329         /*
330          * Only one callback per type
331          */
332
333         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
334                 /* we allow a second registration of the same message
335                    type if it has a different private pointer. This is
336                    needed in, for example, the internal notify code,
337                    which creates a new notify context for each tree
338                    connect, and expects to receive messages to each of
339                    them. */
340                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
341                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
342                                   (unsigned)msg_type, private_data));
343                         cb->fn = fn;
344                         cb->private_data = private_data;
345                         return NT_STATUS_OK;
346                 }
347         }
348
349         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
350                 return NT_STATUS_NO_MEMORY;
351         }
352
353         cb->msg_type = msg_type;
354         cb->fn = fn;
355         cb->private_data = private_data;
356
357         DLIST_ADD(msg_ctx->callbacks, cb);
358         return NT_STATUS_OK;
359 }
360
361 /*
362   De-register the function for a particular message type.
363 */
364 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
365                           void *private_data)
366 {
367         struct messaging_callback *cb, *next;
368
369         for (cb = ctx->callbacks; cb; cb = next) {
370                 next = cb->next;
371                 if ((cb->msg_type == msg_type)
372                     && (cb->private_data == private_data)) {
373                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
374                                   (unsigned)msg_type, private_data));
375                         DLIST_REMOVE(ctx->callbacks, cb);
376                         TALLOC_FREE(cb);
377                 }
378         }
379 }
380
381 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
382                                    const struct server_id *dst)
383 {
384         return ((msg_ctx->id.vnn == dst->vnn) &&
385                 (msg_ctx->id.pid == dst->pid));
386 }
387
388 /*
389   Send a message to a particular server
390 */
391 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
392                         struct server_id server, uint32_t msg_type,
393                         const DATA_BLOB *data)
394 {
395         struct iovec iov;
396
397         iov.iov_base = data->data;
398         iov.iov_len = data->length;
399
400         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
401 }
402
403 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
404                             struct server_id server, uint32_t msg_type,
405                             const uint8_t *buf, size_t len)
406 {
407         DATA_BLOB blob = data_blob_const(buf, len);
408         return messaging_send(msg_ctx, server, msg_type, &blob);
409 }
410
411 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
412                             struct server_id server, uint32_t msg_type,
413                             const struct iovec *iov, int iovlen)
414 {
415         if (server_id_is_disconnected(&server)) {
416                 return NT_STATUS_INVALID_PARAMETER_MIX;
417         }
418
419         if (!procid_is_local(&server)) {
420                 return msg_ctx->remote->send_fn(msg_ctx->id, server,
421                                                 msg_type, iov, iovlen,
422                                                 msg_ctx->remote);
423         }
424
425         if (messaging_is_self_send(msg_ctx, &server)) {
426                 struct messaging_rec rec;
427                 uint8_t *buf;
428                 DATA_BLOB data;
429
430                 buf = iov_buf(talloc_tos(), iov, iovlen);
431                 if (buf == NULL) {
432                         return NT_STATUS_NO_MEMORY;
433                 }
434
435                 data = data_blob_const(buf, talloc_get_size(buf));
436
437                 rec.msg_version = MESSAGE_VERSION;
438                 rec.msg_type = msg_type & MSG_TYPE_MASK;
439                 rec.dest = server;
440                 rec.src = msg_ctx->id;
441                 rec.buf = data;
442                 messaging_dispatch_rec(msg_ctx, &rec);
443                 TALLOC_FREE(buf);
444                 return NT_STATUS_OK;
445         }
446
447         return msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
448                                        iov, iovlen, msg_ctx->local);
449 }
450
451 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
452                                                struct messaging_rec *rec)
453 {
454         struct messaging_rec *result;
455
456         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
457                                       1, rec->buf.length);
458         if (result == NULL) {
459                 return NULL;
460         }
461         *result = *rec;
462
463         /* Doesn't fail, see talloc_pooled_object */
464
465         result->buf.data = talloc_memdup(result, rec->buf.data,
466                                          rec->buf.length);
467         return result;
468 }
469
470 struct messaging_filtered_read_state {
471         struct tevent_context *ev;
472         struct messaging_context *msg_ctx;
473         void *tevent_handle;
474
475         bool (*filter)(struct messaging_rec *rec, void *private_data);
476         void *private_data;
477
478         struct messaging_rec *rec;
479 };
480
481 static void messaging_filtered_read_cleanup(struct tevent_req *req,
482                                             enum tevent_req_state req_state);
483
484 struct tevent_req *messaging_filtered_read_send(
485         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
486         struct messaging_context *msg_ctx,
487         bool (*filter)(struct messaging_rec *rec, void *private_data),
488         void *private_data)
489 {
490         struct tevent_req *req;
491         struct messaging_filtered_read_state *state;
492         size_t new_waiters_len;
493
494         req = tevent_req_create(mem_ctx, &state,
495                                 struct messaging_filtered_read_state);
496         if (req == NULL) {
497                 return NULL;
498         }
499         state->ev = ev;
500         state->msg_ctx = msg_ctx;
501         state->filter = filter;
502         state->private_data = private_data;
503
504         /*
505          * We have to defer the callback here, as we might be called from
506          * within a different tevent_context than state->ev
507          */
508         tevent_req_defer_callback(req, state->ev);
509
510         state->tevent_handle = messaging_dgm_register_tevent_context(
511                 state, msg_ctx, ev);
512         if (tevent_req_nomem(state, req)) {
513                 return tevent_req_post(req, ev);
514         }
515
516         /*
517          * We add ourselves to the "new_waiters" array, not the "waiters"
518          * array. If we are called from within messaging_read_done,
519          * messaging_dispatch_rec will be in an active for-loop on
520          * "waiters". We must be careful not to mess with this array, because
521          * it could mean that a single event is being delivered twice.
522          */
523
524         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
525
526         if (new_waiters_len == msg_ctx->num_new_waiters) {
527                 struct tevent_req **tmp;
528
529                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
530                                      struct tevent_req *, new_waiters_len+1);
531                 if (tevent_req_nomem(tmp, req)) {
532                         return tevent_req_post(req, ev);
533                 }
534                 msg_ctx->new_waiters = tmp;
535         }
536
537         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
538         msg_ctx->num_new_waiters += 1;
539         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
540
541         return req;
542 }
543
544 static void messaging_filtered_read_cleanup(struct tevent_req *req,
545                                             enum tevent_req_state req_state)
546 {
547         struct messaging_filtered_read_state *state = tevent_req_data(
548                 req, struct messaging_filtered_read_state);
549         struct messaging_context *msg_ctx = state->msg_ctx;
550         unsigned i;
551
552         tevent_req_set_cleanup_fn(req, NULL);
553
554         TALLOC_FREE(state->tevent_handle);
555
556         /*
557          * Just set the [new_]waiters entry to NULL, be careful not to mess
558          * with the other "waiters" array contents. We are often called from
559          * within "messaging_dispatch_rec", which loops over
560          * "waiters". Messing with the "waiters" array will mess up that
561          * for-loop.
562          */
563
564         for (i=0; i<msg_ctx->num_waiters; i++) {
565                 if (msg_ctx->waiters[i] == req) {
566                         msg_ctx->waiters[i] = NULL;
567                         return;
568                 }
569         }
570
571         for (i=0; i<msg_ctx->num_new_waiters; i++) {
572                 if (msg_ctx->new_waiters[i] == req) {
573                         msg_ctx->new_waiters[i] = NULL;
574                         return;
575                 }
576         }
577 }
578
579 static void messaging_filtered_read_done(struct tevent_req *req,
580                                          struct messaging_rec *rec)
581 {
582         struct messaging_filtered_read_state *state = tevent_req_data(
583                 req, struct messaging_filtered_read_state);
584
585         state->rec = messaging_rec_dup(state, rec);
586         if (tevent_req_nomem(state->rec, req)) {
587                 return;
588         }
589         tevent_req_done(req);
590 }
591
592 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
593                                  struct messaging_rec **presult)
594 {
595         struct messaging_filtered_read_state *state = tevent_req_data(
596                 req, struct messaging_filtered_read_state);
597         int err;
598
599         if (tevent_req_is_unix_error(req, &err)) {
600                 tevent_req_received(req);
601                 return err;
602         }
603         *presult = talloc_move(mem_ctx, &state->rec);
604         return 0;
605 }
606
607 struct messaging_read_state {
608         uint32_t msg_type;
609         struct messaging_rec *rec;
610 };
611
612 static bool messaging_read_filter(struct messaging_rec *rec,
613                                   void *private_data);
614 static void messaging_read_done(struct tevent_req *subreq);
615
616 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
617                                        struct tevent_context *ev,
618                                        struct messaging_context *msg,
619                                        uint32_t msg_type)
620 {
621         struct tevent_req *req, *subreq;
622         struct messaging_read_state *state;
623
624         req = tevent_req_create(mem_ctx, &state,
625                                 struct messaging_read_state);
626         if (req == NULL) {
627                 return NULL;
628         }
629         state->msg_type = msg_type;
630
631         subreq = messaging_filtered_read_send(state, ev, msg,
632                                               messaging_read_filter, state);
633         if (tevent_req_nomem(subreq, req)) {
634                 return tevent_req_post(req, ev);
635         }
636         tevent_req_set_callback(subreq, messaging_read_done, req);
637         return req;
638 }
639
640 static bool messaging_read_filter(struct messaging_rec *rec,
641                                   void *private_data)
642 {
643         struct messaging_read_state *state = talloc_get_type_abort(
644                 private_data, struct messaging_read_state);
645
646         return rec->msg_type == state->msg_type;
647 }
648
649 static void messaging_read_done(struct tevent_req *subreq)
650 {
651         struct tevent_req *req = tevent_req_callback_data(
652                 subreq, struct tevent_req);
653         struct messaging_read_state *state = tevent_req_data(
654                 req, struct messaging_read_state);
655         int ret;
656
657         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
658         TALLOC_FREE(subreq);
659         if (tevent_req_error(req, ret)) {
660                 return;
661         }
662         tevent_req_done(req);
663 }
664
665 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
666                         struct messaging_rec **presult)
667 {
668         struct messaging_read_state *state = tevent_req_data(
669                 req, struct messaging_read_state);
670         int err;
671
672         if (tevent_req_is_unix_error(req, &err)) {
673                 return err;
674         }
675         if (presult != NULL) {
676                 *presult = talloc_move(mem_ctx, &state->rec);
677         }
678         return 0;
679 }
680
681 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
682 {
683         if (msg_ctx->num_new_waiters == 0) {
684                 return true;
685         }
686
687         if (talloc_array_length(msg_ctx->waiters) <
688             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
689                 struct tevent_req **tmp;
690                 tmp = talloc_realloc(
691                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
692                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
693                 if (tmp == NULL) {
694                         DEBUG(1, ("%s: talloc failed\n", __func__));
695                         return false;
696                 }
697                 msg_ctx->waiters = tmp;
698         }
699
700         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
701                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
702
703         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
704         msg_ctx->num_new_waiters = 0;
705
706         return true;
707 }
708
709 struct messaging_defer_callback_state {
710         struct messaging_context *msg_ctx;
711         struct messaging_rec *rec;
712         void (*fn)(struct messaging_context *msg, void *private_data,
713                    uint32_t msg_type, struct server_id server_id,
714                    DATA_BLOB *data);
715         void *private_data;
716 };
717
718 static void messaging_defer_callback_trigger(struct tevent_context *ev,
719                                              struct tevent_immediate *im,
720                                              void *private_data);
721
722 static void messaging_defer_callback(
723         struct messaging_context *msg_ctx, struct messaging_rec *rec,
724         void (*fn)(struct messaging_context *msg, void *private_data,
725                    uint32_t msg_type, struct server_id server_id,
726                    DATA_BLOB *data),
727         void *private_data)
728 {
729         struct messaging_defer_callback_state *state;
730         struct tevent_immediate *im;
731
732         state = talloc(msg_ctx, struct messaging_defer_callback_state);
733         if (state == NULL) {
734                 DEBUG(1, ("talloc failed\n"));
735                 return;
736         }
737         state->msg_ctx = msg_ctx;
738         state->fn = fn;
739         state->private_data = private_data;
740
741         state->rec = messaging_rec_dup(state, rec);
742         if (state->rec == NULL) {
743                 DEBUG(1, ("talloc failed\n"));
744                 TALLOC_FREE(state);
745                 return;
746         }
747
748         im = tevent_create_immediate(state);
749         if (im == NULL) {
750                 DEBUG(1, ("tevent_create_immediate failed\n"));
751                 TALLOC_FREE(state);
752                 return;
753         }
754         tevent_schedule_immediate(im, msg_ctx->event_ctx,
755                                   messaging_defer_callback_trigger, state);
756 }
757
758 static void messaging_defer_callback_trigger(struct tevent_context *ev,
759                                              struct tevent_immediate *im,
760                                              void *private_data)
761 {
762         struct messaging_defer_callback_state *state = talloc_get_type_abort(
763                 private_data, struct messaging_defer_callback_state);
764         struct messaging_rec *rec = state->rec;
765
766         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
767                   &rec->buf);
768         TALLOC_FREE(state);
769 }
770
771 /*
772   Dispatch one messaging_rec
773 */
774 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
775                             struct messaging_rec *rec)
776 {
777         struct messaging_callback *cb, *next;
778         unsigned i;
779
780         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
781                 next = cb->next;
782                 if (cb->msg_type != rec->msg_type) {
783                         continue;
784                 }
785
786                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
787                         /*
788                          * This is a self-send. We are called here from
789                          * messaging_send(), and we don't want to directly
790                          * recurse into the callback but go via a
791                          * tevent_loop_once
792                          */
793                         messaging_defer_callback(msg_ctx, rec, cb->fn,
794                                                  cb->private_data);
795                 } else {
796                         /*
797                          * This comes from a different process. we are called
798                          * from the event loop, so we should call back
799                          * directly.
800                          */
801                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
802                                rec->src, &rec->buf);
803                 }
804                 /*
805                  * we continue looking for matching messages after finding
806                  * one. This matters for subsystems like the internal notify
807                  * code which register more than one handler for the same
808                  * message type
809                  */
810         }
811
812         if (!messaging_append_new_waiters(msg_ctx)) {
813                 return;
814         }
815
816         i = 0;
817         while (i < msg_ctx->num_waiters) {
818                 struct tevent_req *req;
819                 struct messaging_filtered_read_state *state;
820
821                 req = msg_ctx->waiters[i];
822                 if (req == NULL) {
823                         /*
824                          * This got cleaned up. In the meantime,
825                          * move everything down one. We need
826                          * to keep the order of waiters, as
827                          * other code may depend on this.
828                          */
829                         if (i < msg_ctx->num_waiters - 1) {
830                                 memmove(&msg_ctx->waiters[i],
831                                         &msg_ctx->waiters[i+1],
832                                         sizeof(struct tevent_req *) *
833                                             (msg_ctx->num_waiters - i - 1));
834                         }
835                         msg_ctx->num_waiters -= 1;
836                         continue;
837                 }
838
839                 state = tevent_req_data(
840                         req, struct messaging_filtered_read_state);
841                 if (state->filter(rec, state->private_data)) {
842                         messaging_filtered_read_done(req, rec);
843                 }
844
845                 i += 1;
846         }
847 }
848
849 static int mess_parent_dgm_cleanup(void *private_data);
850 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
851
852 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
853 {
854         struct tevent_req *req;
855
856         req = background_job_send(
857                 msg, msg->event_ctx, msg, NULL, 0,
858                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
859                             60*15),
860                 mess_parent_dgm_cleanup, msg);
861         if (req == NULL) {
862                 return false;
863         }
864         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
865         return true;
866 }
867
868 static int mess_parent_dgm_cleanup(void *private_data)
869 {
870         struct messaging_context *msg_ctx = talloc_get_type_abort(
871                 private_data, struct messaging_context);
872         NTSTATUS status;
873
874         status = messaging_dgm_wipe(msg_ctx);
875         DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
876         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
877                            60*15);
878 }
879
880 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
881 {
882         struct messaging_context *msg = tevent_req_callback_data(
883                 req, struct messaging_context);
884         NTSTATUS status;
885
886         status = background_job_recv(req);
887         TALLOC_FREE(req);
888         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
889                   nt_errstr(status)));
890
891         req = background_job_send(
892                 msg, msg->event_ctx, msg, NULL, 0,
893                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
894                             60*15),
895                 mess_parent_dgm_cleanup, msg);
896         if (req == NULL) {
897                 DEBUG(1, ("background_job_send failed\n"));
898         }
899         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
900 }
901
902 struct messaging_backend *messaging_local_backend(
903         struct messaging_context *msg_ctx)
904 {
905         return msg_ctx->local;
906 }
907
908 struct tevent_context *messaging_tevent_context(
909         struct messaging_context *msg_ctx)
910 {
911         return msg_ctx->event_ctx;
912 }
913
914 /** @} **/