80ecec4aaa87e444c34f779ccd6772e001cae977
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55
56 struct messaging_callback {
57         struct messaging_callback *prev, *next;
58         uint32 msg_type;
59         void (*fn)(struct messaging_context *msg, void *private_data, 
60                    uint32_t msg_type, 
61                    struct server_id server_id, DATA_BLOB *data);
62         void *private_data;
63 };
64
65 struct messaging_context {
66         struct server_id id;
67         struct tevent_context *event_ctx;
68         struct messaging_callback *callbacks;
69
70         struct tevent_req **new_waiters;
71         unsigned num_new_waiters;
72
73         struct tevent_req **waiters;
74         unsigned num_waiters;
75
76         struct messaging_dgm_context *local;
77
78         struct messaging_backend *remote;
79 };
80
81 struct messaging_hdr {
82         int msg_type;
83         struct server_id dst;
84         struct server_id src;
85 };
86
87 /****************************************************************************
88  A useful function for testing the message system.
89 ****************************************************************************/
90
91 static void ping_message(struct messaging_context *msg_ctx,
92                          void *private_data,
93                          uint32_t msg_type,
94                          struct server_id src,
95                          DATA_BLOB *data)
96 {
97         struct server_id_buf idbuf;
98
99         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
100                   server_id_str_buf(src, &idbuf), (int)data->length,
101                   data->data ? (char *)data->data : ""));
102
103         messaging_send(msg_ctx, src, MSG_PONG, data);
104 }
105
106 /****************************************************************************
107  Register/replace a dispatch function for a particular message type.
108  JRA changed Dec 13 2006. Only one message handler now permitted per type.
109  *NOTE*: Dispatch functions must be able to cope with incoming
110  messages on an *odd* byte boundary.
111 ****************************************************************************/
112
113 struct msg_all {
114         struct messaging_context *msg_ctx;
115         int msg_type;
116         uint32 msg_flag;
117         const void *buf;
118         size_t len;
119         int n_sent;
120 };
121
122 /****************************************************************************
123  Send one of the messages for the broadcast.
124 ****************************************************************************/
125
126 static int traverse_fn(struct db_record *rec, const struct server_id *id,
127                        uint32_t msg_flags, void *state)
128 {
129         struct msg_all *msg_all = (struct msg_all *)state;
130         NTSTATUS status;
131
132         /* Don't send if the receiver hasn't registered an interest. */
133
134         if((msg_flags & msg_all->msg_flag) == 0) {
135                 return 0;
136         }
137
138         /* If the msg send fails because the pid was not found (i.e. smbd died), 
139          * the msg has already been deleted from the messages.tdb.*/
140
141         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
142                                     (const uint8_t *)msg_all->buf, msg_all->len);
143
144         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
145                 struct server_id_buf idbuf;
146
147                 /*
148                  * If the pid was not found delete the entry from
149                  * serverid.tdb
150                  */
151
152                 DEBUG(2, ("pid %s doesn't exist\n",
153                           server_id_str_buf(*id, &idbuf)));
154
155                 dbwrap_record_delete(rec);
156         }
157         msg_all->n_sent++;
158         return 0;
159 }
160
161 /**
162  * Send a message to all smbd processes.
163  *
164  * It isn't very efficient, but should be OK for the sorts of
165  * applications that use it. When we need efficient broadcast we can add
166  * it.
167  *
168  * @param n_sent Set to the number of messages sent.  This should be
169  * equal to the number of processes, but be careful for races.
170  *
171  * @retval True for success.
172  **/
173 bool message_send_all(struct messaging_context *msg_ctx,
174                       int msg_type,
175                       const void *buf, size_t len,
176                       int *n_sent)
177 {
178         struct msg_all msg_all;
179
180         msg_all.msg_type = msg_type;
181         if (msg_type < 0x100) {
182                 msg_all.msg_flag = FLAG_MSG_GENERAL;
183         } else if (msg_type > 0x100 && msg_type < 0x200) {
184                 msg_all.msg_flag = FLAG_MSG_NMBD;
185         } else if (msg_type > 0x200 && msg_type < 0x300) {
186                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
187         } else if (msg_type > 0x300 && msg_type < 0x400) {
188                 msg_all.msg_flag = FLAG_MSG_SMBD;
189         } else if (msg_type > 0x400 && msg_type < 0x600) {
190                 msg_all.msg_flag = FLAG_MSG_WINBIND;
191         } else if (msg_type > 4000 && msg_type < 5000) {
192                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
193         } else {
194                 return false;
195         }
196
197         msg_all.buf = buf;
198         msg_all.len = len;
199         msg_all.n_sent = 0;
200         msg_all.msg_ctx = msg_ctx;
201
202         serverid_traverse(traverse_fn, &msg_all);
203         if (n_sent)
204                 *n_sent = msg_all.n_sent;
205         return true;
206 }
207
208 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
209                               void *private_data)
210 {
211         struct messaging_context *msg_ctx = talloc_get_type_abort(
212                 private_data, struct messaging_context);
213         const struct messaging_hdr *hdr;
214         struct server_id_buf idbuf;
215         struct messaging_rec rec;
216
217         if (msg_len < sizeof(*hdr)) {
218                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
219                 return;
220         }
221
222         /*
223          * messages_dgm guarantees alignment, so we can cast here
224          */
225         hdr = (const struct messaging_hdr *)msg;
226
227         DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
228                    (unsigned)hdr->msg_type, (unsigned)(msg_len - sizeof(*hdr)),
229                    server_id_str_buf(hdr->src, &idbuf)));
230
231         rec = (struct messaging_rec) {
232                 .msg_version = MESSAGE_VERSION,
233                 .msg_type = hdr->msg_type,
234                 .src = hdr->src,
235                 .dest = hdr->dst,
236                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
237                 .buf.length = msg_len - sizeof(*hdr)
238         };
239
240         messaging_dispatch_rec(msg_ctx, &rec);
241 }
242
243 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
244                                          struct tevent_context *ev)
245 {
246         struct messaging_context *ctx;
247         NTSTATUS status;
248         int ret;
249
250         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
251                 return NULL;
252         }
253
254         ctx->id = procid_self();
255         ctx->event_ctx = ev;
256
257         sec_init();
258
259         ret = messaging_dgm_init(ctx, ctx->event_ctx, ctx->id,
260                                  lp_cache_directory(), sec_initial_uid(),
261                                  messaging_recv_cb, ctx, &ctx->local);
262
263         if (ret != 0) {
264                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
265                 TALLOC_FREE(ctx);
266                 return NULL;
267         }
268
269         if (lp_clustering()) {
270                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
271
272                 if (!NT_STATUS_IS_OK(status)) {
273                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
274                                   nt_errstr(status)));
275                         TALLOC_FREE(ctx);
276                         return NULL;
277                 }
278         }
279         ctx->id.vnn = get_my_vnn();
280
281         messaging_register(ctx, NULL, MSG_PING, ping_message);
282
283         /* Register some debugging related messages */
284
285         register_msg_pool_usage(ctx);
286         register_dmalloc_msgs(ctx);
287         debug_register_msgs(ctx);
288
289         return ctx;
290 }
291
292 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
293 {
294         return msg_ctx->id;
295 }
296
297 /*
298  * re-init after a fork
299  */
300 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
301 {
302         NTSTATUS status;
303         int ret;
304
305         TALLOC_FREE(msg_ctx->local);
306
307         msg_ctx->id = procid_self();
308
309         ret = messaging_dgm_init(msg_ctx, msg_ctx->event_ctx, msg_ctx->id,
310                                  lp_cache_directory(), sec_initial_uid(),
311                                  messaging_recv_cb, msg_ctx,
312                                  &msg_ctx->local);
313         if (ret != 0) {
314                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
315                 return map_nt_error_from_unix(ret);
316         }
317
318         TALLOC_FREE(msg_ctx->remote);
319
320         if (lp_clustering()) {
321                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
322                                               &msg_ctx->remote);
323
324                 if (!NT_STATUS_IS_OK(status)) {
325                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
326                                   nt_errstr(status)));
327                         return status;
328                 }
329         }
330
331         return NT_STATUS_OK;
332 }
333
334
335 /*
336  * Register a dispatch function for a particular message type. Allow multiple
337  * registrants
338 */
339 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
340                             void *private_data,
341                             uint32_t msg_type,
342                             void (*fn)(struct messaging_context *msg,
343                                        void *private_data, 
344                                        uint32_t msg_type, 
345                                        struct server_id server_id,
346                                        DATA_BLOB *data))
347 {
348         struct messaging_callback *cb;
349
350         DEBUG(5, ("Registering messaging pointer for type %u - "
351                   "private_data=%p\n",
352                   (unsigned)msg_type, private_data));
353
354         /*
355          * Only one callback per type
356          */
357
358         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
359                 /* we allow a second registration of the same message
360                    type if it has a different private pointer. This is
361                    needed in, for example, the internal notify code,
362                    which creates a new notify context for each tree
363                    connect, and expects to receive messages to each of
364                    them. */
365                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
366                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
367                                   (unsigned)msg_type, private_data));
368                         cb->fn = fn;
369                         cb->private_data = private_data;
370                         return NT_STATUS_OK;
371                 }
372         }
373
374         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
375                 return NT_STATUS_NO_MEMORY;
376         }
377
378         cb->msg_type = msg_type;
379         cb->fn = fn;
380         cb->private_data = private_data;
381
382         DLIST_ADD(msg_ctx->callbacks, cb);
383         return NT_STATUS_OK;
384 }
385
386 /*
387   De-register the function for a particular message type.
388 */
389 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
390                           void *private_data)
391 {
392         struct messaging_callback *cb, *next;
393
394         for (cb = ctx->callbacks; cb; cb = next) {
395                 next = cb->next;
396                 if ((cb->msg_type == msg_type)
397                     && (cb->private_data == private_data)) {
398                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
399                                   (unsigned)msg_type, private_data));
400                         DLIST_REMOVE(ctx->callbacks, cb);
401                         TALLOC_FREE(cb);
402                 }
403         }
404 }
405
406 /*
407   Send a message to a particular server
408 */
409 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
410                         struct server_id server, uint32_t msg_type,
411                         const DATA_BLOB *data)
412 {
413         struct iovec iov;
414
415         iov.iov_base = data->data;
416         iov.iov_len = data->length;
417
418         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
419 }
420
421 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
422                             struct server_id server, uint32_t msg_type,
423                             const uint8_t *buf, size_t len)
424 {
425         DATA_BLOB blob = data_blob_const(buf, len);
426         return messaging_send(msg_ctx, server, msg_type, &blob);
427 }
428
429 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
430                             struct server_id server, uint32_t msg_type,
431                             const struct iovec *iov, int iovlen)
432 {
433         int ret;
434         struct messaging_hdr hdr;
435         struct iovec iov2[iovlen+1];
436
437         if (server_id_is_disconnected(&server)) {
438                 return NT_STATUS_INVALID_PARAMETER_MIX;
439         }
440
441         if (!procid_is_local(&server)) {
442                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
443                                                msg_type, iov, iovlen,
444                                                msg_ctx->remote);
445                 if (ret != 0) {
446                         return map_nt_error_from_unix(ret);
447                 }
448                 return NT_STATUS_OK;
449         }
450
451         if (server_id_same_process(&msg_ctx->id, &server)) {
452                 struct messaging_rec rec;
453                 uint8_t *buf;
454
455                 /*
456                  * Self-send, directly dispatch
457                  */
458
459                 buf = iov_buf(talloc_tos(), iov, iovlen);
460                 if (buf == NULL) {
461                         return NT_STATUS_NO_MEMORY;
462                 }
463
464                 rec.msg_version = MESSAGE_VERSION;
465                 rec.msg_type = msg_type & MSG_TYPE_MASK;
466                 rec.dest = server;
467                 rec.src = msg_ctx->id;
468                 rec.buf = data_blob_const(buf, talloc_get_size(buf));
469                 messaging_dispatch_rec(msg_ctx, &rec);
470                 TALLOC_FREE(buf);
471                 return NT_STATUS_OK;
472         }
473
474         hdr = (struct messaging_hdr) {
475                 .msg_type = msg_type,
476                 .dst = server,
477                 .src = msg_ctx->id
478         };
479         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
480         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
481
482         become_root();
483         ret = messaging_dgm_send(msg_ctx->local, server.pid, iov2, iovlen+1);
484         unbecome_root();
485
486         if (ret != 0) {
487                 return map_nt_error_from_unix(ret);
488         }
489         return NT_STATUS_OK;
490 }
491
492 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
493                                                struct messaging_rec *rec)
494 {
495         struct messaging_rec *result;
496
497         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
498                                       1, rec->buf.length);
499         if (result == NULL) {
500                 return NULL;
501         }
502         *result = *rec;
503
504         /* Doesn't fail, see talloc_pooled_object */
505
506         result->buf.data = talloc_memdup(result, rec->buf.data,
507                                          rec->buf.length);
508         return result;
509 }
510
511 struct messaging_filtered_read_state {
512         struct tevent_context *ev;
513         struct messaging_context *msg_ctx;
514         void *tevent_handle;
515
516         bool (*filter)(struct messaging_rec *rec, void *private_data);
517         void *private_data;
518
519         struct messaging_rec *rec;
520 };
521
522 static void messaging_filtered_read_cleanup(struct tevent_req *req,
523                                             enum tevent_req_state req_state);
524
525 struct tevent_req *messaging_filtered_read_send(
526         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
527         struct messaging_context *msg_ctx,
528         bool (*filter)(struct messaging_rec *rec, void *private_data),
529         void *private_data)
530 {
531         struct tevent_req *req;
532         struct messaging_filtered_read_state *state;
533         size_t new_waiters_len;
534
535         req = tevent_req_create(mem_ctx, &state,
536                                 struct messaging_filtered_read_state);
537         if (req == NULL) {
538                 return NULL;
539         }
540         state->ev = ev;
541         state->msg_ctx = msg_ctx;
542         state->filter = filter;
543         state->private_data = private_data;
544
545         /*
546          * We have to defer the callback here, as we might be called from
547          * within a different tevent_context than state->ev
548          */
549         tevent_req_defer_callback(req, state->ev);
550
551         state->tevent_handle = messaging_dgm_register_tevent_context(
552                 state, msg_ctx->local, ev);
553         if (tevent_req_nomem(state, req)) {
554                 return tevent_req_post(req, ev);
555         }
556
557         /*
558          * We add ourselves to the "new_waiters" array, not the "waiters"
559          * array. If we are called from within messaging_read_done,
560          * messaging_dispatch_rec will be in an active for-loop on
561          * "waiters". We must be careful not to mess with this array, because
562          * it could mean that a single event is being delivered twice.
563          */
564
565         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
566
567         if (new_waiters_len == msg_ctx->num_new_waiters) {
568                 struct tevent_req **tmp;
569
570                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
571                                      struct tevent_req *, new_waiters_len+1);
572                 if (tevent_req_nomem(tmp, req)) {
573                         return tevent_req_post(req, ev);
574                 }
575                 msg_ctx->new_waiters = tmp;
576         }
577
578         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
579         msg_ctx->num_new_waiters += 1;
580         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
581
582         return req;
583 }
584
585 static void messaging_filtered_read_cleanup(struct tevent_req *req,
586                                             enum tevent_req_state req_state)
587 {
588         struct messaging_filtered_read_state *state = tevent_req_data(
589                 req, struct messaging_filtered_read_state);
590         struct messaging_context *msg_ctx = state->msg_ctx;
591         unsigned i;
592
593         tevent_req_set_cleanup_fn(req, NULL);
594
595         TALLOC_FREE(state->tevent_handle);
596
597         /*
598          * Just set the [new_]waiters entry to NULL, be careful not to mess
599          * with the other "waiters" array contents. We are often called from
600          * within "messaging_dispatch_rec", which loops over
601          * "waiters". Messing with the "waiters" array will mess up that
602          * for-loop.
603          */
604
605         for (i=0; i<msg_ctx->num_waiters; i++) {
606                 if (msg_ctx->waiters[i] == req) {
607                         msg_ctx->waiters[i] = NULL;
608                         return;
609                 }
610         }
611
612         for (i=0; i<msg_ctx->num_new_waiters; i++) {
613                 if (msg_ctx->new_waiters[i] == req) {
614                         msg_ctx->new_waiters[i] = NULL;
615                         return;
616                 }
617         }
618 }
619
620 static void messaging_filtered_read_done(struct tevent_req *req,
621                                          struct messaging_rec *rec)
622 {
623         struct messaging_filtered_read_state *state = tevent_req_data(
624                 req, struct messaging_filtered_read_state);
625
626         state->rec = messaging_rec_dup(state, rec);
627         if (tevent_req_nomem(state->rec, req)) {
628                 return;
629         }
630         tevent_req_done(req);
631 }
632
633 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
634                                  struct messaging_rec **presult)
635 {
636         struct messaging_filtered_read_state *state = tevent_req_data(
637                 req, struct messaging_filtered_read_state);
638         int err;
639
640         if (tevent_req_is_unix_error(req, &err)) {
641                 tevent_req_received(req);
642                 return err;
643         }
644         *presult = talloc_move(mem_ctx, &state->rec);
645         return 0;
646 }
647
648 struct messaging_read_state {
649         uint32_t msg_type;
650         struct messaging_rec *rec;
651 };
652
653 static bool messaging_read_filter(struct messaging_rec *rec,
654                                   void *private_data);
655 static void messaging_read_done(struct tevent_req *subreq);
656
657 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
658                                        struct tevent_context *ev,
659                                        struct messaging_context *msg,
660                                        uint32_t msg_type)
661 {
662         struct tevent_req *req, *subreq;
663         struct messaging_read_state *state;
664
665         req = tevent_req_create(mem_ctx, &state,
666                                 struct messaging_read_state);
667         if (req == NULL) {
668                 return NULL;
669         }
670         state->msg_type = msg_type;
671
672         subreq = messaging_filtered_read_send(state, ev, msg,
673                                               messaging_read_filter, state);
674         if (tevent_req_nomem(subreq, req)) {
675                 return tevent_req_post(req, ev);
676         }
677         tevent_req_set_callback(subreq, messaging_read_done, req);
678         return req;
679 }
680
681 static bool messaging_read_filter(struct messaging_rec *rec,
682                                   void *private_data)
683 {
684         struct messaging_read_state *state = talloc_get_type_abort(
685                 private_data, struct messaging_read_state);
686
687         return rec->msg_type == state->msg_type;
688 }
689
690 static void messaging_read_done(struct tevent_req *subreq)
691 {
692         struct tevent_req *req = tevent_req_callback_data(
693                 subreq, struct tevent_req);
694         struct messaging_read_state *state = tevent_req_data(
695                 req, struct messaging_read_state);
696         int ret;
697
698         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
699         TALLOC_FREE(subreq);
700         if (tevent_req_error(req, ret)) {
701                 return;
702         }
703         tevent_req_done(req);
704 }
705
706 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
707                         struct messaging_rec **presult)
708 {
709         struct messaging_read_state *state = tevent_req_data(
710                 req, struct messaging_read_state);
711         int err;
712
713         if (tevent_req_is_unix_error(req, &err)) {
714                 return err;
715         }
716         if (presult != NULL) {
717                 *presult = talloc_move(mem_ctx, &state->rec);
718         }
719         return 0;
720 }
721
722 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
723 {
724         if (msg_ctx->num_new_waiters == 0) {
725                 return true;
726         }
727
728         if (talloc_array_length(msg_ctx->waiters) <
729             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
730                 struct tevent_req **tmp;
731                 tmp = talloc_realloc(
732                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
733                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
734                 if (tmp == NULL) {
735                         DEBUG(1, ("%s: talloc failed\n", __func__));
736                         return false;
737                 }
738                 msg_ctx->waiters = tmp;
739         }
740
741         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
742                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
743
744         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
745         msg_ctx->num_new_waiters = 0;
746
747         return true;
748 }
749
750 struct messaging_defer_callback_state {
751         struct messaging_context *msg_ctx;
752         struct messaging_rec *rec;
753         void (*fn)(struct messaging_context *msg, void *private_data,
754                    uint32_t msg_type, struct server_id server_id,
755                    DATA_BLOB *data);
756         void *private_data;
757 };
758
759 static void messaging_defer_callback_trigger(struct tevent_context *ev,
760                                              struct tevent_immediate *im,
761                                              void *private_data);
762
763 static void messaging_defer_callback(
764         struct messaging_context *msg_ctx, struct messaging_rec *rec,
765         void (*fn)(struct messaging_context *msg, void *private_data,
766                    uint32_t msg_type, struct server_id server_id,
767                    DATA_BLOB *data),
768         void *private_data)
769 {
770         struct messaging_defer_callback_state *state;
771         struct tevent_immediate *im;
772
773         state = talloc(msg_ctx, struct messaging_defer_callback_state);
774         if (state == NULL) {
775                 DEBUG(1, ("talloc failed\n"));
776                 return;
777         }
778         state->msg_ctx = msg_ctx;
779         state->fn = fn;
780         state->private_data = private_data;
781
782         state->rec = messaging_rec_dup(state, rec);
783         if (state->rec == NULL) {
784                 DEBUG(1, ("talloc failed\n"));
785                 TALLOC_FREE(state);
786                 return;
787         }
788
789         im = tevent_create_immediate(state);
790         if (im == NULL) {
791                 DEBUG(1, ("tevent_create_immediate failed\n"));
792                 TALLOC_FREE(state);
793                 return;
794         }
795         tevent_schedule_immediate(im, msg_ctx->event_ctx,
796                                   messaging_defer_callback_trigger, state);
797 }
798
799 static void messaging_defer_callback_trigger(struct tevent_context *ev,
800                                              struct tevent_immediate *im,
801                                              void *private_data)
802 {
803         struct messaging_defer_callback_state *state = talloc_get_type_abort(
804                 private_data, struct messaging_defer_callback_state);
805         struct messaging_rec *rec = state->rec;
806
807         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
808                   &rec->buf);
809         TALLOC_FREE(state);
810 }
811
812 /*
813   Dispatch one messaging_rec
814 */
815 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
816                             struct messaging_rec *rec)
817 {
818         struct messaging_callback *cb, *next;
819         unsigned i;
820
821         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
822                 next = cb->next;
823                 if (cb->msg_type != rec->msg_type) {
824                         continue;
825                 }
826
827                 if (server_id_same_process(&rec->src, &rec->dest)) {
828                         /*
829                          * This is a self-send. We are called here from
830                          * messaging_send(), and we don't want to directly
831                          * recurse into the callback but go via a
832                          * tevent_loop_once
833                          */
834                         messaging_defer_callback(msg_ctx, rec, cb->fn,
835                                                  cb->private_data);
836                 } else {
837                         /*
838                          * This comes from a different process. we are called
839                          * from the event loop, so we should call back
840                          * directly.
841                          */
842                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
843                                rec->src, &rec->buf);
844                 }
845                 /*
846                  * we continue looking for matching messages after finding
847                  * one. This matters for subsystems like the internal notify
848                  * code which register more than one handler for the same
849                  * message type
850                  */
851         }
852
853         if (!messaging_append_new_waiters(msg_ctx)) {
854                 return;
855         }
856
857         i = 0;
858         while (i < msg_ctx->num_waiters) {
859                 struct tevent_req *req;
860                 struct messaging_filtered_read_state *state;
861
862                 req = msg_ctx->waiters[i];
863                 if (req == NULL) {
864                         /*
865                          * This got cleaned up. In the meantime,
866                          * move everything down one. We need
867                          * to keep the order of waiters, as
868                          * other code may depend on this.
869                          */
870                         if (i < msg_ctx->num_waiters - 1) {
871                                 memmove(&msg_ctx->waiters[i],
872                                         &msg_ctx->waiters[i+1],
873                                         sizeof(struct tevent_req *) *
874                                             (msg_ctx->num_waiters - i - 1));
875                         }
876                         msg_ctx->num_waiters -= 1;
877                         continue;
878                 }
879
880                 state = tevent_req_data(
881                         req, struct messaging_filtered_read_state);
882                 if (state->filter(rec, state->private_data)) {
883                         messaging_filtered_read_done(req, rec);
884                 }
885
886                 i += 1;
887         }
888 }
889
890 static int mess_parent_dgm_cleanup(void *private_data);
891 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
892
893 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
894 {
895         struct tevent_req *req;
896
897         req = background_job_send(
898                 msg, msg->event_ctx, msg, NULL, 0,
899                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
900                             60*15),
901                 mess_parent_dgm_cleanup, msg);
902         if (req == NULL) {
903                 return false;
904         }
905         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
906         return true;
907 }
908
909 static int mess_parent_dgm_cleanup(void *private_data)
910 {
911         struct messaging_context *msg_ctx = talloc_get_type_abort(
912                 private_data, struct messaging_context);
913         int ret;
914
915         ret = messaging_dgm_wipe(msg_ctx->local);
916         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
917                    ret ? strerror(ret) : "ok"));
918         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
919                            60*15);
920 }
921
922 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
923 {
924         struct messaging_context *msg = tevent_req_callback_data(
925                 req, struct messaging_context);
926         NTSTATUS status;
927
928         status = background_job_recv(req);
929         TALLOC_FREE(req);
930         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
931                   nt_errstr(status)));
932
933         req = background_job_send(
934                 msg, msg->event_ctx, msg, NULL, 0,
935                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
936                             60*15),
937                 mess_parent_dgm_cleanup, msg);
938         if (req == NULL) {
939                 DEBUG(1, ("background_job_send failed\n"));
940         }
941         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
942 }
943
944 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
945 {
946         int ret;
947
948         if (pid == 0) {
949                 ret = messaging_dgm_wipe(msg_ctx->local);
950         } else {
951                 ret = messaging_dgm_cleanup(msg_ctx->local, pid);
952         }
953
954         return ret;
955 }
956
957 struct tevent_context *messaging_tevent_context(
958         struct messaging_context *msg_ctx)
959 {
960         return msg_ctx->event_ctx;
961 }
962
963 /** @} **/