messaging3: Remove an unnecessary variable
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 struct messaging_context {
65         struct server_id id;
66         struct tevent_context *event_ctx;
67         struct messaging_callback *callbacks;
68
69         struct tevent_req **new_waiters;
70         unsigned num_new_waiters;
71
72         struct tevent_req **waiters;
73         unsigned num_waiters;
74
75         struct messaging_backend *local;
76         struct messaging_backend *remote;
77
78         bool *have_context;
79 };
80
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
82
83 /****************************************************************************
84  A useful function for testing the message system.
85 ****************************************************************************/
86
87 static void ping_message(struct messaging_context *msg_ctx,
88                          void *private_data,
89                          uint32_t msg_type,
90                          struct server_id src,
91                          DATA_BLOB *data)
92 {
93         struct server_id_buf idbuf;
94
95         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
96                   server_id_str_buf(src, &idbuf), (int)data->length,
97                   data->data ? (char *)data->data : ""));
98
99         messaging_send(msg_ctx, src, MSG_PONG, data);
100 }
101
102 /****************************************************************************
103  Register/replace a dispatch function for a particular message type.
104  JRA changed Dec 13 2006. Only one message handler now permitted per type.
105  *NOTE*: Dispatch functions must be able to cope with incoming
106  messages on an *odd* byte boundary.
107 ****************************************************************************/
108
109 struct msg_all {
110         struct messaging_context *msg_ctx;
111         int msg_type;
112         uint32 msg_flag;
113         const void *buf;
114         size_t len;
115         int n_sent;
116 };
117
118 /****************************************************************************
119  Send one of the messages for the broadcast.
120 ****************************************************************************/
121
122 static int traverse_fn(struct db_record *rec, const struct server_id *id,
123                        uint32_t msg_flags, void *state)
124 {
125         struct msg_all *msg_all = (struct msg_all *)state;
126         NTSTATUS status;
127
128         /* Don't send if the receiver hasn't registered an interest. */
129
130         if((msg_flags & msg_all->msg_flag) == 0) {
131                 return 0;
132         }
133
134         /* If the msg send fails because the pid was not found (i.e. smbd died), 
135          * the msg has already been deleted from the messages.tdb.*/
136
137         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
138                                     (const uint8_t *)msg_all->buf, msg_all->len);
139
140         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
141                 struct server_id_buf idbuf;
142
143                 /*
144                  * If the pid was not found delete the entry from
145                  * serverid.tdb
146                  */
147
148                 DEBUG(2, ("pid %s doesn't exist\n",
149                           server_id_str_buf(*id, &idbuf)));
150
151                 dbwrap_record_delete(rec);
152         }
153         msg_all->n_sent++;
154         return 0;
155 }
156
157 /**
158  * Send a message to all smbd processes.
159  *
160  * It isn't very efficient, but should be OK for the sorts of
161  * applications that use it. When we need efficient broadcast we can add
162  * it.
163  *
164  * @param n_sent Set to the number of messages sent.  This should be
165  * equal to the number of processes, but be careful for races.
166  *
167  * @retval True for success.
168  **/
169 bool message_send_all(struct messaging_context *msg_ctx,
170                       int msg_type,
171                       const void *buf, size_t len,
172                       int *n_sent)
173 {
174         struct msg_all msg_all;
175
176         msg_all.msg_type = msg_type;
177         if (msg_type < 0x100) {
178                 msg_all.msg_flag = FLAG_MSG_GENERAL;
179         } else if (msg_type > 0x100 && msg_type < 0x200) {
180                 msg_all.msg_flag = FLAG_MSG_NMBD;
181         } else if (msg_type > 0x200 && msg_type < 0x300) {
182                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
183         } else if (msg_type > 0x300 && msg_type < 0x400) {
184                 msg_all.msg_flag = FLAG_MSG_SMBD;
185         } else if (msg_type > 0x400 && msg_type < 0x600) {
186                 msg_all.msg_flag = FLAG_MSG_WINBIND;
187         } else if (msg_type > 4000 && msg_type < 5000) {
188                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
189         } else {
190                 return false;
191         }
192
193         msg_all.buf = buf;
194         msg_all.len = len;
195         msg_all.n_sent = 0;
196         msg_all.msg_ctx = msg_ctx;
197
198         serverid_traverse(traverse_fn, &msg_all);
199         if (n_sent)
200                 *n_sent = msg_all.n_sent;
201         return true;
202 }
203
204 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
205                                          struct tevent_context *ev)
206 {
207         struct messaging_context *ctx;
208         NTSTATUS status;
209         int ret;
210         static bool have_context = false;
211
212         if (have_context) {
213                 DEBUG(0, ("No two messaging contexts per process\n"));
214                 return NULL;
215         }
216
217
218         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
219                 return NULL;
220         }
221
222         ctx->id = procid_self();
223         ctx->event_ctx = ev;
224         ctx->have_context = &have_context;
225
226         ret = messaging_dgm_init(ctx, ctx, &ctx->local);
227
228         if (ret != 0) {
229                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
230                 TALLOC_FREE(ctx);
231                 return NULL;
232         }
233
234         if (lp_clustering()) {
235                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
236
237                 if (!NT_STATUS_IS_OK(status)) {
238                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
239                                   nt_errstr(status)));
240                         TALLOC_FREE(ctx);
241                         return NULL;
242                 }
243         }
244         ctx->id.vnn = get_my_vnn();
245
246         messaging_register(ctx, NULL, MSG_PING, ping_message);
247
248         /* Register some debugging related messages */
249
250         register_msg_pool_usage(ctx);
251         register_dmalloc_msgs(ctx);
252         debug_register_msgs(ctx);
253
254         have_context = true;
255         talloc_set_destructor(ctx, messaging_context_destructor);
256
257         return ctx;
258 }
259
260 static int messaging_context_destructor(struct messaging_context *msg_ctx)
261 {
262         SMB_ASSERT(*msg_ctx->have_context);
263         *msg_ctx->have_context = false;
264         return 0;
265 }
266
267 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
268 {
269         return msg_ctx->id;
270 }
271
272 /*
273  * re-init after a fork
274  */
275 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
276 {
277         NTSTATUS status;
278         int ret;
279
280         TALLOC_FREE(msg_ctx->local);
281
282         msg_ctx->id = procid_self();
283
284         ret = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
285         if (ret != 0) {
286                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
287                 return map_nt_error_from_unix(ret);
288         }
289
290         TALLOC_FREE(msg_ctx->remote);
291
292         if (lp_clustering()) {
293                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
294                                               &msg_ctx->remote);
295
296                 if (!NT_STATUS_IS_OK(status)) {
297                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
298                                   nt_errstr(status)));
299                         return status;
300                 }
301         }
302
303         return NT_STATUS_OK;
304 }
305
306
307 /*
308  * Register a dispatch function for a particular message type. Allow multiple
309  * registrants
310 */
311 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
312                             void *private_data,
313                             uint32_t msg_type,
314                             void (*fn)(struct messaging_context *msg,
315                                        void *private_data, 
316                                        uint32_t msg_type, 
317                                        struct server_id server_id,
318                                        DATA_BLOB *data))
319 {
320         struct messaging_callback *cb;
321
322         DEBUG(5, ("Registering messaging pointer for type %u - "
323                   "private_data=%p\n",
324                   (unsigned)msg_type, private_data));
325
326         /*
327          * Only one callback per type
328          */
329
330         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
331                 /* we allow a second registration of the same message
332                    type if it has a different private pointer. This is
333                    needed in, for example, the internal notify code,
334                    which creates a new notify context for each tree
335                    connect, and expects to receive messages to each of
336                    them. */
337                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
338                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
339                                   (unsigned)msg_type, private_data));
340                         cb->fn = fn;
341                         cb->private_data = private_data;
342                         return NT_STATUS_OK;
343                 }
344         }
345
346         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
347                 return NT_STATUS_NO_MEMORY;
348         }
349
350         cb->msg_type = msg_type;
351         cb->fn = fn;
352         cb->private_data = private_data;
353
354         DLIST_ADD(msg_ctx->callbacks, cb);
355         return NT_STATUS_OK;
356 }
357
358 /*
359   De-register the function for a particular message type.
360 */
361 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
362                           void *private_data)
363 {
364         struct messaging_callback *cb, *next;
365
366         for (cb = ctx->callbacks; cb; cb = next) {
367                 next = cb->next;
368                 if ((cb->msg_type == msg_type)
369                     && (cb->private_data == private_data)) {
370                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
371                                   (unsigned)msg_type, private_data));
372                         DLIST_REMOVE(ctx->callbacks, cb);
373                         TALLOC_FREE(cb);
374                 }
375         }
376 }
377
378 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
379                                    const struct server_id *dst)
380 {
381         return ((msg_ctx->id.vnn == dst->vnn) &&
382                 (msg_ctx->id.pid == dst->pid));
383 }
384
385 /*
386   Send a message to a particular server
387 */
388 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
389                         struct server_id server, uint32_t msg_type,
390                         const DATA_BLOB *data)
391 {
392         struct iovec iov;
393
394         iov.iov_base = data->data;
395         iov.iov_len = data->length;
396
397         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
398 }
399
400 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
401                             struct server_id server, uint32_t msg_type,
402                             const uint8_t *buf, size_t len)
403 {
404         DATA_BLOB blob = data_blob_const(buf, len);
405         return messaging_send(msg_ctx, server, msg_type, &blob);
406 }
407
408 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
409                             struct server_id server, uint32_t msg_type,
410                             const struct iovec *iov, int iovlen)
411 {
412         int ret;
413
414         if (server_id_is_disconnected(&server)) {
415                 return NT_STATUS_INVALID_PARAMETER_MIX;
416         }
417
418         if (!procid_is_local(&server)) {
419                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
420                                                msg_type, iov, iovlen,
421                                                msg_ctx->remote);
422                 if (ret != 0) {
423                         return map_nt_error_from_unix(ret);
424                 }
425                 return NT_STATUS_OK;
426         }
427
428         if (messaging_is_self_send(msg_ctx, &server)) {
429                 struct messaging_rec rec;
430                 uint8_t *buf;
431
432                 buf = iov_buf(talloc_tos(), iov, iovlen);
433                 if (buf == NULL) {
434                         return NT_STATUS_NO_MEMORY;
435                 }
436
437                 rec.msg_version = MESSAGE_VERSION;
438                 rec.msg_type = msg_type & MSG_TYPE_MASK;
439                 rec.dest = server;
440                 rec.src = msg_ctx->id;
441                 rec.buf = data_blob_const(buf, talloc_get_size(buf));
442                 messaging_dispatch_rec(msg_ctx, &rec);
443                 TALLOC_FREE(buf);
444                 return NT_STATUS_OK;
445         }
446
447         ret = msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
448                                       iov, iovlen, msg_ctx->local);
449         if (ret != 0) {
450                 return map_nt_error_from_unix(ret);
451         }
452         return NT_STATUS_OK;
453 }
454
455 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
456                                                struct messaging_rec *rec)
457 {
458         struct messaging_rec *result;
459
460         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
461                                       1, rec->buf.length);
462         if (result == NULL) {
463                 return NULL;
464         }
465         *result = *rec;
466
467         /* Doesn't fail, see talloc_pooled_object */
468
469         result->buf.data = talloc_memdup(result, rec->buf.data,
470                                          rec->buf.length);
471         return result;
472 }
473
474 struct messaging_filtered_read_state {
475         struct tevent_context *ev;
476         struct messaging_context *msg_ctx;
477         void *tevent_handle;
478
479         bool (*filter)(struct messaging_rec *rec, void *private_data);
480         void *private_data;
481
482         struct messaging_rec *rec;
483 };
484
485 static void messaging_filtered_read_cleanup(struct tevent_req *req,
486                                             enum tevent_req_state req_state);
487
488 struct tevent_req *messaging_filtered_read_send(
489         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
490         struct messaging_context *msg_ctx,
491         bool (*filter)(struct messaging_rec *rec, void *private_data),
492         void *private_data)
493 {
494         struct tevent_req *req;
495         struct messaging_filtered_read_state *state;
496         size_t new_waiters_len;
497
498         req = tevent_req_create(mem_ctx, &state,
499                                 struct messaging_filtered_read_state);
500         if (req == NULL) {
501                 return NULL;
502         }
503         state->ev = ev;
504         state->msg_ctx = msg_ctx;
505         state->filter = filter;
506         state->private_data = private_data;
507
508         /*
509          * We have to defer the callback here, as we might be called from
510          * within a different tevent_context than state->ev
511          */
512         tevent_req_defer_callback(req, state->ev);
513
514         state->tevent_handle = messaging_dgm_register_tevent_context(
515                 state, msg_ctx, ev);
516         if (tevent_req_nomem(state, req)) {
517                 return tevent_req_post(req, ev);
518         }
519
520         /*
521          * We add ourselves to the "new_waiters" array, not the "waiters"
522          * array. If we are called from within messaging_read_done,
523          * messaging_dispatch_rec will be in an active for-loop on
524          * "waiters". We must be careful not to mess with this array, because
525          * it could mean that a single event is being delivered twice.
526          */
527
528         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
529
530         if (new_waiters_len == msg_ctx->num_new_waiters) {
531                 struct tevent_req **tmp;
532
533                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
534                                      struct tevent_req *, new_waiters_len+1);
535                 if (tevent_req_nomem(tmp, req)) {
536                         return tevent_req_post(req, ev);
537                 }
538                 msg_ctx->new_waiters = tmp;
539         }
540
541         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
542         msg_ctx->num_new_waiters += 1;
543         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
544
545         return req;
546 }
547
548 static void messaging_filtered_read_cleanup(struct tevent_req *req,
549                                             enum tevent_req_state req_state)
550 {
551         struct messaging_filtered_read_state *state = tevent_req_data(
552                 req, struct messaging_filtered_read_state);
553         struct messaging_context *msg_ctx = state->msg_ctx;
554         unsigned i;
555
556         tevent_req_set_cleanup_fn(req, NULL);
557
558         TALLOC_FREE(state->tevent_handle);
559
560         /*
561          * Just set the [new_]waiters entry to NULL, be careful not to mess
562          * with the other "waiters" array contents. We are often called from
563          * within "messaging_dispatch_rec", which loops over
564          * "waiters". Messing with the "waiters" array will mess up that
565          * for-loop.
566          */
567
568         for (i=0; i<msg_ctx->num_waiters; i++) {
569                 if (msg_ctx->waiters[i] == req) {
570                         msg_ctx->waiters[i] = NULL;
571                         return;
572                 }
573         }
574
575         for (i=0; i<msg_ctx->num_new_waiters; i++) {
576                 if (msg_ctx->new_waiters[i] == req) {
577                         msg_ctx->new_waiters[i] = NULL;
578                         return;
579                 }
580         }
581 }
582
583 static void messaging_filtered_read_done(struct tevent_req *req,
584                                          struct messaging_rec *rec)
585 {
586         struct messaging_filtered_read_state *state = tevent_req_data(
587                 req, struct messaging_filtered_read_state);
588
589         state->rec = messaging_rec_dup(state, rec);
590         if (tevent_req_nomem(state->rec, req)) {
591                 return;
592         }
593         tevent_req_done(req);
594 }
595
596 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
597                                  struct messaging_rec **presult)
598 {
599         struct messaging_filtered_read_state *state = tevent_req_data(
600                 req, struct messaging_filtered_read_state);
601         int err;
602
603         if (tevent_req_is_unix_error(req, &err)) {
604                 tevent_req_received(req);
605                 return err;
606         }
607         *presult = talloc_move(mem_ctx, &state->rec);
608         return 0;
609 }
610
611 struct messaging_read_state {
612         uint32_t msg_type;
613         struct messaging_rec *rec;
614 };
615
616 static bool messaging_read_filter(struct messaging_rec *rec,
617                                   void *private_data);
618 static void messaging_read_done(struct tevent_req *subreq);
619
620 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
621                                        struct tevent_context *ev,
622                                        struct messaging_context *msg,
623                                        uint32_t msg_type)
624 {
625         struct tevent_req *req, *subreq;
626         struct messaging_read_state *state;
627
628         req = tevent_req_create(mem_ctx, &state,
629                                 struct messaging_read_state);
630         if (req == NULL) {
631                 return NULL;
632         }
633         state->msg_type = msg_type;
634
635         subreq = messaging_filtered_read_send(state, ev, msg,
636                                               messaging_read_filter, state);
637         if (tevent_req_nomem(subreq, req)) {
638                 return tevent_req_post(req, ev);
639         }
640         tevent_req_set_callback(subreq, messaging_read_done, req);
641         return req;
642 }
643
644 static bool messaging_read_filter(struct messaging_rec *rec,
645                                   void *private_data)
646 {
647         struct messaging_read_state *state = talloc_get_type_abort(
648                 private_data, struct messaging_read_state);
649
650         return rec->msg_type == state->msg_type;
651 }
652
653 static void messaging_read_done(struct tevent_req *subreq)
654 {
655         struct tevent_req *req = tevent_req_callback_data(
656                 subreq, struct tevent_req);
657         struct messaging_read_state *state = tevent_req_data(
658                 req, struct messaging_read_state);
659         int ret;
660
661         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
662         TALLOC_FREE(subreq);
663         if (tevent_req_error(req, ret)) {
664                 return;
665         }
666         tevent_req_done(req);
667 }
668
669 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
670                         struct messaging_rec **presult)
671 {
672         struct messaging_read_state *state = tevent_req_data(
673                 req, struct messaging_read_state);
674         int err;
675
676         if (tevent_req_is_unix_error(req, &err)) {
677                 return err;
678         }
679         if (presult != NULL) {
680                 *presult = talloc_move(mem_ctx, &state->rec);
681         }
682         return 0;
683 }
684
685 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
686 {
687         if (msg_ctx->num_new_waiters == 0) {
688                 return true;
689         }
690
691         if (talloc_array_length(msg_ctx->waiters) <
692             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
693                 struct tevent_req **tmp;
694                 tmp = talloc_realloc(
695                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
696                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
697                 if (tmp == NULL) {
698                         DEBUG(1, ("%s: talloc failed\n", __func__));
699                         return false;
700                 }
701                 msg_ctx->waiters = tmp;
702         }
703
704         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
705                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
706
707         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
708         msg_ctx->num_new_waiters = 0;
709
710         return true;
711 }
712
713 struct messaging_defer_callback_state {
714         struct messaging_context *msg_ctx;
715         struct messaging_rec *rec;
716         void (*fn)(struct messaging_context *msg, void *private_data,
717                    uint32_t msg_type, struct server_id server_id,
718                    DATA_BLOB *data);
719         void *private_data;
720 };
721
722 static void messaging_defer_callback_trigger(struct tevent_context *ev,
723                                              struct tevent_immediate *im,
724                                              void *private_data);
725
726 static void messaging_defer_callback(
727         struct messaging_context *msg_ctx, struct messaging_rec *rec,
728         void (*fn)(struct messaging_context *msg, void *private_data,
729                    uint32_t msg_type, struct server_id server_id,
730                    DATA_BLOB *data),
731         void *private_data)
732 {
733         struct messaging_defer_callback_state *state;
734         struct tevent_immediate *im;
735
736         state = talloc(msg_ctx, struct messaging_defer_callback_state);
737         if (state == NULL) {
738                 DEBUG(1, ("talloc failed\n"));
739                 return;
740         }
741         state->msg_ctx = msg_ctx;
742         state->fn = fn;
743         state->private_data = private_data;
744
745         state->rec = messaging_rec_dup(state, rec);
746         if (state->rec == NULL) {
747                 DEBUG(1, ("talloc failed\n"));
748                 TALLOC_FREE(state);
749                 return;
750         }
751
752         im = tevent_create_immediate(state);
753         if (im == NULL) {
754                 DEBUG(1, ("tevent_create_immediate failed\n"));
755                 TALLOC_FREE(state);
756                 return;
757         }
758         tevent_schedule_immediate(im, msg_ctx->event_ctx,
759                                   messaging_defer_callback_trigger, state);
760 }
761
762 static void messaging_defer_callback_trigger(struct tevent_context *ev,
763                                              struct tevent_immediate *im,
764                                              void *private_data)
765 {
766         struct messaging_defer_callback_state *state = talloc_get_type_abort(
767                 private_data, struct messaging_defer_callback_state);
768         struct messaging_rec *rec = state->rec;
769
770         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
771                   &rec->buf);
772         TALLOC_FREE(state);
773 }
774
775 /*
776   Dispatch one messaging_rec
777 */
778 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
779                             struct messaging_rec *rec)
780 {
781         struct messaging_callback *cb, *next;
782         unsigned i;
783
784         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
785                 next = cb->next;
786                 if (cb->msg_type != rec->msg_type) {
787                         continue;
788                 }
789
790                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
791                         /*
792                          * This is a self-send. We are called here from
793                          * messaging_send(), and we don't want to directly
794                          * recurse into the callback but go via a
795                          * tevent_loop_once
796                          */
797                         messaging_defer_callback(msg_ctx, rec, cb->fn,
798                                                  cb->private_data);
799                 } else {
800                         /*
801                          * This comes from a different process. we are called
802                          * from the event loop, so we should call back
803                          * directly.
804                          */
805                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
806                                rec->src, &rec->buf);
807                 }
808                 /*
809                  * we continue looking for matching messages after finding
810                  * one. This matters for subsystems like the internal notify
811                  * code which register more than one handler for the same
812                  * message type
813                  */
814         }
815
816         if (!messaging_append_new_waiters(msg_ctx)) {
817                 return;
818         }
819
820         i = 0;
821         while (i < msg_ctx->num_waiters) {
822                 struct tevent_req *req;
823                 struct messaging_filtered_read_state *state;
824
825                 req = msg_ctx->waiters[i];
826                 if (req == NULL) {
827                         /*
828                          * This got cleaned up. In the meantime,
829                          * move everything down one. We need
830                          * to keep the order of waiters, as
831                          * other code may depend on this.
832                          */
833                         if (i < msg_ctx->num_waiters - 1) {
834                                 memmove(&msg_ctx->waiters[i],
835                                         &msg_ctx->waiters[i+1],
836                                         sizeof(struct tevent_req *) *
837                                             (msg_ctx->num_waiters - i - 1));
838                         }
839                         msg_ctx->num_waiters -= 1;
840                         continue;
841                 }
842
843                 state = tevent_req_data(
844                         req, struct messaging_filtered_read_state);
845                 if (state->filter(rec, state->private_data)) {
846                         messaging_filtered_read_done(req, rec);
847                 }
848
849                 i += 1;
850         }
851 }
852
853 static int mess_parent_dgm_cleanup(void *private_data);
854 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
855
856 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
857 {
858         struct tevent_req *req;
859
860         req = background_job_send(
861                 msg, msg->event_ctx, msg, NULL, 0,
862                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
863                             60*15),
864                 mess_parent_dgm_cleanup, msg);
865         if (req == NULL) {
866                 return false;
867         }
868         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
869         return true;
870 }
871
872 static int mess_parent_dgm_cleanup(void *private_data)
873 {
874         struct messaging_context *msg_ctx = talloc_get_type_abort(
875                 private_data, struct messaging_context);
876         int ret;
877
878         ret = messaging_dgm_wipe(msg_ctx);
879         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
880                    ret ? strerror(ret) : "ok"));
881         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
882                            60*15);
883 }
884
885 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
886 {
887         struct messaging_context *msg = tevent_req_callback_data(
888                 req, struct messaging_context);
889         NTSTATUS status;
890
891         status = background_job_recv(req);
892         TALLOC_FREE(req);
893         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
894                   nt_errstr(status)));
895
896         req = background_job_send(
897                 msg, msg->event_ctx, msg, NULL, 0,
898                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
899                             60*15),
900                 mess_parent_dgm_cleanup, msg);
901         if (req == NULL) {
902                 DEBUG(1, ("background_job_send failed\n"));
903         }
904         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
905 }
906
907 struct messaging_backend *messaging_local_backend(
908         struct messaging_context *msg_ctx)
909 {
910         return msg_ctx->local;
911 }
912
913 struct tevent_context *messaging_tevent_context(
914         struct messaging_context *msg_ctx)
915 {
916         return msg_ctx->event_ctx;
917 }
918
919 /** @} **/