messaging3: Make messaging_dgm_init return 0/errno
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 struct messaging_context {
65         struct server_id id;
66         struct tevent_context *event_ctx;
67         struct messaging_callback *callbacks;
68
69         struct tevent_req **new_waiters;
70         unsigned num_new_waiters;
71
72         struct tevent_req **waiters;
73         unsigned num_waiters;
74
75         struct messaging_backend *local;
76         struct messaging_backend *remote;
77
78         bool *have_context;
79 };
80
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
82
83 /****************************************************************************
84  A useful function for testing the message system.
85 ****************************************************************************/
86
87 static void ping_message(struct messaging_context *msg_ctx,
88                          void *private_data,
89                          uint32_t msg_type,
90                          struct server_id src,
91                          DATA_BLOB *data)
92 {
93         const char *msg = "none";
94         char *free_me = NULL;
95
96         if (data->data != NULL) {
97                 free_me = talloc_strndup(talloc_tos(), (char *)data->data,
98                                          data->length);
99                 msg = free_me;
100         }
101         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
102                  procid_str_static(&src), msg));
103         TALLOC_FREE(free_me);
104         messaging_send(msg_ctx, src, MSG_PONG, data);
105 }
106
107 /****************************************************************************
108  Register/replace a dispatch function for a particular message type.
109  JRA changed Dec 13 2006. Only one message handler now permitted per type.
110  *NOTE*: Dispatch functions must be able to cope with incoming
111  messages on an *odd* byte boundary.
112 ****************************************************************************/
113
114 struct msg_all {
115         struct messaging_context *msg_ctx;
116         int msg_type;
117         uint32 msg_flag;
118         const void *buf;
119         size_t len;
120         int n_sent;
121 };
122
123 /****************************************************************************
124  Send one of the messages for the broadcast.
125 ****************************************************************************/
126
127 static int traverse_fn(struct db_record *rec, const struct server_id *id,
128                        uint32_t msg_flags, void *state)
129 {
130         struct msg_all *msg_all = (struct msg_all *)state;
131         NTSTATUS status;
132
133         /* Don't send if the receiver hasn't registered an interest. */
134
135         if((msg_flags & msg_all->msg_flag) == 0) {
136                 return 0;
137         }
138
139         /* If the msg send fails because the pid was not found (i.e. smbd died), 
140          * the msg has already been deleted from the messages.tdb.*/
141
142         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
143                                     (const uint8_t *)msg_all->buf, msg_all->len);
144
145         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
146
147                 /*
148                  * If the pid was not found delete the entry from
149                  * serverid.tdb
150                  */
151
152                 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
153
154                 dbwrap_record_delete(rec);
155         }
156         msg_all->n_sent++;
157         return 0;
158 }
159
160 /**
161  * Send a message to all smbd processes.
162  *
163  * It isn't very efficient, but should be OK for the sorts of
164  * applications that use it. When we need efficient broadcast we can add
165  * it.
166  *
167  * @param n_sent Set to the number of messages sent.  This should be
168  * equal to the number of processes, but be careful for races.
169  *
170  * @retval True for success.
171  **/
172 bool message_send_all(struct messaging_context *msg_ctx,
173                       int msg_type,
174                       const void *buf, size_t len,
175                       int *n_sent)
176 {
177         struct msg_all msg_all;
178
179         msg_all.msg_type = msg_type;
180         if (msg_type < 0x100) {
181                 msg_all.msg_flag = FLAG_MSG_GENERAL;
182         } else if (msg_type > 0x100 && msg_type < 0x200) {
183                 msg_all.msg_flag = FLAG_MSG_NMBD;
184         } else if (msg_type > 0x200 && msg_type < 0x300) {
185                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
186         } else if (msg_type > 0x300 && msg_type < 0x400) {
187                 msg_all.msg_flag = FLAG_MSG_SMBD;
188         } else if (msg_type > 0x400 && msg_type < 0x600) {
189                 msg_all.msg_flag = FLAG_MSG_WINBIND;
190         } else if (msg_type > 4000 && msg_type < 5000) {
191                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
192         } else {
193                 return false;
194         }
195
196         msg_all.buf = buf;
197         msg_all.len = len;
198         msg_all.n_sent = 0;
199         msg_all.msg_ctx = msg_ctx;
200
201         serverid_traverse(traverse_fn, &msg_all);
202         if (n_sent)
203                 *n_sent = msg_all.n_sent;
204         return true;
205 }
206
207 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
208                                          struct tevent_context *ev)
209 {
210         struct messaging_context *ctx;
211         NTSTATUS status;
212         int ret;
213         static bool have_context = false;
214
215         if (have_context) {
216                 DEBUG(0, ("No two messaging contexts per process\n"));
217                 return NULL;
218         }
219
220
221         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
222                 return NULL;
223         }
224
225         ctx->id = procid_self();
226         ctx->event_ctx = ev;
227         ctx->have_context = &have_context;
228
229         ret = messaging_dgm_init(ctx, ctx, &ctx->local);
230
231         if (ret != 0) {
232                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
233                 TALLOC_FREE(ctx);
234                 return NULL;
235         }
236
237         if (lp_clustering()) {
238                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
239
240                 if (!NT_STATUS_IS_OK(status)) {
241                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
242                                   nt_errstr(status)));
243                         TALLOC_FREE(ctx);
244                         return NULL;
245                 }
246         }
247         ctx->id.vnn = get_my_vnn();
248
249         messaging_register(ctx, NULL, MSG_PING, ping_message);
250
251         /* Register some debugging related messages */
252
253         register_msg_pool_usage(ctx);
254         register_dmalloc_msgs(ctx);
255         debug_register_msgs(ctx);
256
257         have_context = true;
258         talloc_set_destructor(ctx, messaging_context_destructor);
259
260         return ctx;
261 }
262
263 static int messaging_context_destructor(struct messaging_context *msg_ctx)
264 {
265         SMB_ASSERT(*msg_ctx->have_context);
266         *msg_ctx->have_context = false;
267         return 0;
268 }
269
270 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
271 {
272         return msg_ctx->id;
273 }
274
275 /*
276  * re-init after a fork
277  */
278 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
279 {
280         NTSTATUS status;
281         int ret;
282
283         TALLOC_FREE(msg_ctx->local);
284
285         msg_ctx->id = procid_self();
286
287         ret = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
288         if (ret != 0) {
289                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
290                 return map_nt_error_from_unix(ret);
291         }
292
293         TALLOC_FREE(msg_ctx->remote);
294
295         if (lp_clustering()) {
296                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
297                                               &msg_ctx->remote);
298
299                 if (!NT_STATUS_IS_OK(status)) {
300                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
301                                   nt_errstr(status)));
302                         return status;
303                 }
304         }
305
306         return NT_STATUS_OK;
307 }
308
309
310 /*
311  * Register a dispatch function for a particular message type. Allow multiple
312  * registrants
313 */
314 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
315                             void *private_data,
316                             uint32_t msg_type,
317                             void (*fn)(struct messaging_context *msg,
318                                        void *private_data, 
319                                        uint32_t msg_type, 
320                                        struct server_id server_id,
321                                        DATA_BLOB *data))
322 {
323         struct messaging_callback *cb;
324
325         DEBUG(5, ("Registering messaging pointer for type %u - "
326                   "private_data=%p\n",
327                   (unsigned)msg_type, private_data));
328
329         /*
330          * Only one callback per type
331          */
332
333         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
334                 /* we allow a second registration of the same message
335                    type if it has a different private pointer. This is
336                    needed in, for example, the internal notify code,
337                    which creates a new notify context for each tree
338                    connect, and expects to receive messages to each of
339                    them. */
340                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
341                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
342                                   (unsigned)msg_type, private_data));
343                         cb->fn = fn;
344                         cb->private_data = private_data;
345                         return NT_STATUS_OK;
346                 }
347         }
348
349         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
350                 return NT_STATUS_NO_MEMORY;
351         }
352
353         cb->msg_type = msg_type;
354         cb->fn = fn;
355         cb->private_data = private_data;
356
357         DLIST_ADD(msg_ctx->callbacks, cb);
358         return NT_STATUS_OK;
359 }
360
361 /*
362   De-register the function for a particular message type.
363 */
364 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
365                           void *private_data)
366 {
367         struct messaging_callback *cb, *next;
368
369         for (cb = ctx->callbacks; cb; cb = next) {
370                 next = cb->next;
371                 if ((cb->msg_type == msg_type)
372                     && (cb->private_data == private_data)) {
373                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
374                                   (unsigned)msg_type, private_data));
375                         DLIST_REMOVE(ctx->callbacks, cb);
376                         TALLOC_FREE(cb);
377                 }
378         }
379 }
380
381 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
382                                    const struct server_id *dst)
383 {
384         return ((msg_ctx->id.vnn == dst->vnn) &&
385                 (msg_ctx->id.pid == dst->pid));
386 }
387
388 /*
389   Send a message to a particular server
390 */
391 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
392                         struct server_id server, uint32_t msg_type,
393                         const DATA_BLOB *data)
394 {
395         struct iovec iov;
396
397         iov.iov_base = data->data;
398         iov.iov_len = data->length;
399
400         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
401 }
402
403 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
404                             struct server_id server, uint32_t msg_type,
405                             const uint8_t *buf, size_t len)
406 {
407         DATA_BLOB blob = data_blob_const(buf, len);
408         return messaging_send(msg_ctx, server, msg_type, &blob);
409 }
410
411 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
412                             struct server_id server, uint32_t msg_type,
413                             const struct iovec *iov, int iovlen)
414 {
415         int ret;
416
417         if (server_id_is_disconnected(&server)) {
418                 return NT_STATUS_INVALID_PARAMETER_MIX;
419         }
420
421         if (!procid_is_local(&server)) {
422                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
423                                                msg_type, iov, iovlen,
424                                                msg_ctx->remote);
425                 if (ret != 0) {
426                         return map_nt_error_from_unix(ret);
427                 }
428                 return NT_STATUS_OK;
429         }
430
431         if (messaging_is_self_send(msg_ctx, &server)) {
432                 struct messaging_rec rec;
433                 uint8_t *buf;
434                 DATA_BLOB data;
435
436                 buf = iov_buf(talloc_tos(), iov, iovlen);
437                 if (buf == NULL) {
438                         return NT_STATUS_NO_MEMORY;
439                 }
440
441                 data = data_blob_const(buf, talloc_get_size(buf));
442
443                 rec.msg_version = MESSAGE_VERSION;
444                 rec.msg_type = msg_type & MSG_TYPE_MASK;
445                 rec.dest = server;
446                 rec.src = msg_ctx->id;
447                 rec.buf = data;
448                 messaging_dispatch_rec(msg_ctx, &rec);
449                 TALLOC_FREE(buf);
450                 return NT_STATUS_OK;
451         }
452
453         ret = msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
454                                       iov, iovlen, msg_ctx->local);
455         if (ret != 0) {
456                 return map_nt_error_from_unix(ret);
457         }
458         return NT_STATUS_OK;
459 }
460
461 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
462                                                struct messaging_rec *rec)
463 {
464         struct messaging_rec *result;
465
466         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
467                                       1, rec->buf.length);
468         if (result == NULL) {
469                 return NULL;
470         }
471         *result = *rec;
472
473         /* Doesn't fail, see talloc_pooled_object */
474
475         result->buf.data = talloc_memdup(result, rec->buf.data,
476                                          rec->buf.length);
477         return result;
478 }
479
480 struct messaging_filtered_read_state {
481         struct tevent_context *ev;
482         struct messaging_context *msg_ctx;
483         void *tevent_handle;
484
485         bool (*filter)(struct messaging_rec *rec, void *private_data);
486         void *private_data;
487
488         struct messaging_rec *rec;
489 };
490
491 static void messaging_filtered_read_cleanup(struct tevent_req *req,
492                                             enum tevent_req_state req_state);
493
494 struct tevent_req *messaging_filtered_read_send(
495         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
496         struct messaging_context *msg_ctx,
497         bool (*filter)(struct messaging_rec *rec, void *private_data),
498         void *private_data)
499 {
500         struct tevent_req *req;
501         struct messaging_filtered_read_state *state;
502         size_t new_waiters_len;
503
504         req = tevent_req_create(mem_ctx, &state,
505                                 struct messaging_filtered_read_state);
506         if (req == NULL) {
507                 return NULL;
508         }
509         state->ev = ev;
510         state->msg_ctx = msg_ctx;
511         state->filter = filter;
512         state->private_data = private_data;
513
514         /*
515          * We have to defer the callback here, as we might be called from
516          * within a different tevent_context than state->ev
517          */
518         tevent_req_defer_callback(req, state->ev);
519
520         state->tevent_handle = messaging_dgm_register_tevent_context(
521                 state, msg_ctx, ev);
522         if (tevent_req_nomem(state, req)) {
523                 return tevent_req_post(req, ev);
524         }
525
526         /*
527          * We add ourselves to the "new_waiters" array, not the "waiters"
528          * array. If we are called from within messaging_read_done,
529          * messaging_dispatch_rec will be in an active for-loop on
530          * "waiters". We must be careful not to mess with this array, because
531          * it could mean that a single event is being delivered twice.
532          */
533
534         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
535
536         if (new_waiters_len == msg_ctx->num_new_waiters) {
537                 struct tevent_req **tmp;
538
539                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
540                                      struct tevent_req *, new_waiters_len+1);
541                 if (tevent_req_nomem(tmp, req)) {
542                         return tevent_req_post(req, ev);
543                 }
544                 msg_ctx->new_waiters = tmp;
545         }
546
547         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
548         msg_ctx->num_new_waiters += 1;
549         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
550
551         return req;
552 }
553
554 static void messaging_filtered_read_cleanup(struct tevent_req *req,
555                                             enum tevent_req_state req_state)
556 {
557         struct messaging_filtered_read_state *state = tevent_req_data(
558                 req, struct messaging_filtered_read_state);
559         struct messaging_context *msg_ctx = state->msg_ctx;
560         unsigned i;
561
562         tevent_req_set_cleanup_fn(req, NULL);
563
564         TALLOC_FREE(state->tevent_handle);
565
566         /*
567          * Just set the [new_]waiters entry to NULL, be careful not to mess
568          * with the other "waiters" array contents. We are often called from
569          * within "messaging_dispatch_rec", which loops over
570          * "waiters". Messing with the "waiters" array will mess up that
571          * for-loop.
572          */
573
574         for (i=0; i<msg_ctx->num_waiters; i++) {
575                 if (msg_ctx->waiters[i] == req) {
576                         msg_ctx->waiters[i] = NULL;
577                         return;
578                 }
579         }
580
581         for (i=0; i<msg_ctx->num_new_waiters; i++) {
582                 if (msg_ctx->new_waiters[i] == req) {
583                         msg_ctx->new_waiters[i] = NULL;
584                         return;
585                 }
586         }
587 }
588
589 static void messaging_filtered_read_done(struct tevent_req *req,
590                                          struct messaging_rec *rec)
591 {
592         struct messaging_filtered_read_state *state = tevent_req_data(
593                 req, struct messaging_filtered_read_state);
594
595         state->rec = messaging_rec_dup(state, rec);
596         if (tevent_req_nomem(state->rec, req)) {
597                 return;
598         }
599         tevent_req_done(req);
600 }
601
602 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
603                                  struct messaging_rec **presult)
604 {
605         struct messaging_filtered_read_state *state = tevent_req_data(
606                 req, struct messaging_filtered_read_state);
607         int err;
608
609         if (tevent_req_is_unix_error(req, &err)) {
610                 tevent_req_received(req);
611                 return err;
612         }
613         *presult = talloc_move(mem_ctx, &state->rec);
614         return 0;
615 }
616
617 struct messaging_read_state {
618         uint32_t msg_type;
619         struct messaging_rec *rec;
620 };
621
622 static bool messaging_read_filter(struct messaging_rec *rec,
623                                   void *private_data);
624 static void messaging_read_done(struct tevent_req *subreq);
625
626 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
627                                        struct tevent_context *ev,
628                                        struct messaging_context *msg,
629                                        uint32_t msg_type)
630 {
631         struct tevent_req *req, *subreq;
632         struct messaging_read_state *state;
633
634         req = tevent_req_create(mem_ctx, &state,
635                                 struct messaging_read_state);
636         if (req == NULL) {
637                 return NULL;
638         }
639         state->msg_type = msg_type;
640
641         subreq = messaging_filtered_read_send(state, ev, msg,
642                                               messaging_read_filter, state);
643         if (tevent_req_nomem(subreq, req)) {
644                 return tevent_req_post(req, ev);
645         }
646         tevent_req_set_callback(subreq, messaging_read_done, req);
647         return req;
648 }
649
650 static bool messaging_read_filter(struct messaging_rec *rec,
651                                   void *private_data)
652 {
653         struct messaging_read_state *state = talloc_get_type_abort(
654                 private_data, struct messaging_read_state);
655
656         return rec->msg_type == state->msg_type;
657 }
658
659 static void messaging_read_done(struct tevent_req *subreq)
660 {
661         struct tevent_req *req = tevent_req_callback_data(
662                 subreq, struct tevent_req);
663         struct messaging_read_state *state = tevent_req_data(
664                 req, struct messaging_read_state);
665         int ret;
666
667         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
668         TALLOC_FREE(subreq);
669         if (tevent_req_error(req, ret)) {
670                 return;
671         }
672         tevent_req_done(req);
673 }
674
675 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
676                         struct messaging_rec **presult)
677 {
678         struct messaging_read_state *state = tevent_req_data(
679                 req, struct messaging_read_state);
680         int err;
681
682         if (tevent_req_is_unix_error(req, &err)) {
683                 return err;
684         }
685         if (presult != NULL) {
686                 *presult = talloc_move(mem_ctx, &state->rec);
687         }
688         return 0;
689 }
690
691 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
692 {
693         if (msg_ctx->num_new_waiters == 0) {
694                 return true;
695         }
696
697         if (talloc_array_length(msg_ctx->waiters) <
698             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
699                 struct tevent_req **tmp;
700                 tmp = talloc_realloc(
701                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
702                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
703                 if (tmp == NULL) {
704                         DEBUG(1, ("%s: talloc failed\n", __func__));
705                         return false;
706                 }
707                 msg_ctx->waiters = tmp;
708         }
709
710         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
711                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
712
713         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
714         msg_ctx->num_new_waiters = 0;
715
716         return true;
717 }
718
719 struct messaging_defer_callback_state {
720         struct messaging_context *msg_ctx;
721         struct messaging_rec *rec;
722         void (*fn)(struct messaging_context *msg, void *private_data,
723                    uint32_t msg_type, struct server_id server_id,
724                    DATA_BLOB *data);
725         void *private_data;
726 };
727
728 static void messaging_defer_callback_trigger(struct tevent_context *ev,
729                                              struct tevent_immediate *im,
730                                              void *private_data);
731
732 static void messaging_defer_callback(
733         struct messaging_context *msg_ctx, struct messaging_rec *rec,
734         void (*fn)(struct messaging_context *msg, void *private_data,
735                    uint32_t msg_type, struct server_id server_id,
736                    DATA_BLOB *data),
737         void *private_data)
738 {
739         struct messaging_defer_callback_state *state;
740         struct tevent_immediate *im;
741
742         state = talloc(msg_ctx, struct messaging_defer_callback_state);
743         if (state == NULL) {
744                 DEBUG(1, ("talloc failed\n"));
745                 return;
746         }
747         state->msg_ctx = msg_ctx;
748         state->fn = fn;
749         state->private_data = private_data;
750
751         state->rec = messaging_rec_dup(state, rec);
752         if (state->rec == NULL) {
753                 DEBUG(1, ("talloc failed\n"));
754                 TALLOC_FREE(state);
755                 return;
756         }
757
758         im = tevent_create_immediate(state);
759         if (im == NULL) {
760                 DEBUG(1, ("tevent_create_immediate failed\n"));
761                 TALLOC_FREE(state);
762                 return;
763         }
764         tevent_schedule_immediate(im, msg_ctx->event_ctx,
765                                   messaging_defer_callback_trigger, state);
766 }
767
768 static void messaging_defer_callback_trigger(struct tevent_context *ev,
769                                              struct tevent_immediate *im,
770                                              void *private_data)
771 {
772         struct messaging_defer_callback_state *state = talloc_get_type_abort(
773                 private_data, struct messaging_defer_callback_state);
774         struct messaging_rec *rec = state->rec;
775
776         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
777                   &rec->buf);
778         TALLOC_FREE(state);
779 }
780
781 /*
782   Dispatch one messaging_rec
783 */
784 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
785                             struct messaging_rec *rec)
786 {
787         struct messaging_callback *cb, *next;
788         unsigned i;
789
790         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
791                 next = cb->next;
792                 if (cb->msg_type != rec->msg_type) {
793                         continue;
794                 }
795
796                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
797                         /*
798                          * This is a self-send. We are called here from
799                          * messaging_send(), and we don't want to directly
800                          * recurse into the callback but go via a
801                          * tevent_loop_once
802                          */
803                         messaging_defer_callback(msg_ctx, rec, cb->fn,
804                                                  cb->private_data);
805                 } else {
806                         /*
807                          * This comes from a different process. we are called
808                          * from the event loop, so we should call back
809                          * directly.
810                          */
811                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
812                                rec->src, &rec->buf);
813                 }
814                 /*
815                  * we continue looking for matching messages after finding
816                  * one. This matters for subsystems like the internal notify
817                  * code which register more than one handler for the same
818                  * message type
819                  */
820         }
821
822         if (!messaging_append_new_waiters(msg_ctx)) {
823                 return;
824         }
825
826         i = 0;
827         while (i < msg_ctx->num_waiters) {
828                 struct tevent_req *req;
829                 struct messaging_filtered_read_state *state;
830
831                 req = msg_ctx->waiters[i];
832                 if (req == NULL) {
833                         /*
834                          * This got cleaned up. In the meantime,
835                          * move everything down one. We need
836                          * to keep the order of waiters, as
837                          * other code may depend on this.
838                          */
839                         if (i < msg_ctx->num_waiters - 1) {
840                                 memmove(&msg_ctx->waiters[i],
841                                         &msg_ctx->waiters[i+1],
842                                         sizeof(struct tevent_req *) *
843                                             (msg_ctx->num_waiters - i - 1));
844                         }
845                         msg_ctx->num_waiters -= 1;
846                         continue;
847                 }
848
849                 state = tevent_req_data(
850                         req, struct messaging_filtered_read_state);
851                 if (state->filter(rec, state->private_data)) {
852                         messaging_filtered_read_done(req, rec);
853                 }
854
855                 i += 1;
856         }
857 }
858
859 static int mess_parent_dgm_cleanup(void *private_data);
860 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
861
862 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
863 {
864         struct tevent_req *req;
865
866         req = background_job_send(
867                 msg, msg->event_ctx, msg, NULL, 0,
868                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
869                             60*15),
870                 mess_parent_dgm_cleanup, msg);
871         if (req == NULL) {
872                 return false;
873         }
874         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
875         return true;
876 }
877
878 static int mess_parent_dgm_cleanup(void *private_data)
879 {
880         struct messaging_context *msg_ctx = talloc_get_type_abort(
881                 private_data, struct messaging_context);
882         int ret;
883
884         ret = messaging_dgm_wipe(msg_ctx);
885         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
886                    ret ? strerror(ret) : "ok"));
887         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
888                            60*15);
889 }
890
891 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
892 {
893         struct messaging_context *msg = tevent_req_callback_data(
894                 req, struct messaging_context);
895         NTSTATUS status;
896
897         status = background_job_recv(req);
898         TALLOC_FREE(req);
899         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
900                   nt_errstr(status)));
901
902         req = background_job_send(
903                 msg, msg->event_ctx, msg, NULL, 0,
904                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
905                             60*15),
906                 mess_parent_dgm_cleanup, msg);
907         if (req == NULL) {
908                 DEBUG(1, ("background_job_send failed\n"));
909         }
910         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
911 }
912
913 struct messaging_backend *messaging_local_backend(
914         struct messaging_context *msg_ctx)
915 {
916         return msg_ctx->local;
917 }
918
919 struct tevent_context *messaging_tevent_context(
920         struct messaging_context *msg_ctx)
921 {
922         return msg_ctx->event_ctx;
923 }
924
925 /** @} **/