s3:messaging: use struct initializers for 'struct messaging_rec'
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55
56 struct messaging_callback {
57         struct messaging_callback *prev, *next;
58         uint32 msg_type;
59         void (*fn)(struct messaging_context *msg, void *private_data, 
60                    uint32_t msg_type, 
61                    struct server_id server_id, DATA_BLOB *data);
62         void *private_data;
63 };
64
65 struct messaging_context {
66         struct server_id id;
67         struct tevent_context *event_ctx;
68         struct messaging_callback *callbacks;
69
70         struct tevent_req **new_waiters;
71         unsigned num_new_waiters;
72
73         struct tevent_req **waiters;
74         unsigned num_waiters;
75
76         struct messaging_backend *remote;
77 };
78
79 struct messaging_hdr {
80         int msg_type;
81         struct server_id dst;
82         struct server_id src;
83 };
84
85 /****************************************************************************
86  A useful function for testing the message system.
87 ****************************************************************************/
88
89 static void ping_message(struct messaging_context *msg_ctx,
90                          void *private_data,
91                          uint32_t msg_type,
92                          struct server_id src,
93                          DATA_BLOB *data)
94 {
95         struct server_id_buf idbuf;
96
97         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
98                   server_id_str_buf(src, &idbuf), (int)data->length,
99                   data->data ? (char *)data->data : ""));
100
101         messaging_send(msg_ctx, src, MSG_PONG, data);
102 }
103
104 /****************************************************************************
105  Register/replace a dispatch function for a particular message type.
106  JRA changed Dec 13 2006. Only one message handler now permitted per type.
107  *NOTE*: Dispatch functions must be able to cope with incoming
108  messages on an *odd* byte boundary.
109 ****************************************************************************/
110
111 struct msg_all {
112         struct messaging_context *msg_ctx;
113         int msg_type;
114         uint32 msg_flag;
115         const void *buf;
116         size_t len;
117         int n_sent;
118 };
119
120 /****************************************************************************
121  Send one of the messages for the broadcast.
122 ****************************************************************************/
123
124 static int traverse_fn(struct db_record *rec, const struct server_id *id,
125                        uint32_t msg_flags, void *state)
126 {
127         struct msg_all *msg_all = (struct msg_all *)state;
128         NTSTATUS status;
129
130         /* Don't send if the receiver hasn't registered an interest. */
131
132         if((msg_flags & msg_all->msg_flag) == 0) {
133                 return 0;
134         }
135
136         /* If the msg send fails because the pid was not found (i.e. smbd died), 
137          * the msg has already been deleted from the messages.tdb.*/
138
139         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
140                                     (const uint8_t *)msg_all->buf, msg_all->len);
141
142         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
143                 struct server_id_buf idbuf;
144
145                 /*
146                  * If the pid was not found delete the entry from
147                  * serverid.tdb
148                  */
149
150                 DEBUG(2, ("pid %s doesn't exist\n",
151                           server_id_str_buf(*id, &idbuf)));
152
153                 dbwrap_record_delete(rec);
154         }
155         msg_all->n_sent++;
156         return 0;
157 }
158
159 /**
160  * Send a message to all smbd processes.
161  *
162  * It isn't very efficient, but should be OK for the sorts of
163  * applications that use it. When we need efficient broadcast we can add
164  * it.
165  *
166  * @param n_sent Set to the number of messages sent.  This should be
167  * equal to the number of processes, but be careful for races.
168  *
169  * @retval True for success.
170  **/
171 bool message_send_all(struct messaging_context *msg_ctx,
172                       int msg_type,
173                       const void *buf, size_t len,
174                       int *n_sent)
175 {
176         struct msg_all msg_all;
177
178         msg_all.msg_type = msg_type;
179         if (msg_type < 0x100) {
180                 msg_all.msg_flag = FLAG_MSG_GENERAL;
181         } else if (msg_type > 0x100 && msg_type < 0x200) {
182                 msg_all.msg_flag = FLAG_MSG_NMBD;
183         } else if (msg_type > 0x200 && msg_type < 0x300) {
184                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
185         } else if (msg_type > 0x300 && msg_type < 0x400) {
186                 msg_all.msg_flag = FLAG_MSG_SMBD;
187         } else if (msg_type > 0x400 && msg_type < 0x600) {
188                 msg_all.msg_flag = FLAG_MSG_WINBIND;
189         } else if (msg_type > 4000 && msg_type < 5000) {
190                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
191         } else {
192                 return false;
193         }
194
195         msg_all.buf = buf;
196         msg_all.len = len;
197         msg_all.n_sent = 0;
198         msg_all.msg_ctx = msg_ctx;
199
200         serverid_traverse(traverse_fn, &msg_all);
201         if (n_sent)
202                 *n_sent = msg_all.n_sent;
203         return true;
204 }
205
206 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
207                               void *private_data)
208 {
209         struct messaging_context *msg_ctx = talloc_get_type_abort(
210                 private_data, struct messaging_context);
211         const struct messaging_hdr *hdr;
212         struct server_id_buf idbuf;
213         struct messaging_rec rec;
214
215         if (msg_len < sizeof(*hdr)) {
216                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
217                 return;
218         }
219
220         /*
221          * messages_dgm guarantees alignment, so we can cast here
222          */
223         hdr = (const struct messaging_hdr *)msg;
224
225         DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
226                    (unsigned)hdr->msg_type, (unsigned)(msg_len - sizeof(*hdr)),
227                    server_id_str_buf(hdr->src, &idbuf)));
228
229         rec = (struct messaging_rec) {
230                 .msg_version = MESSAGE_VERSION,
231                 .msg_type = hdr->msg_type,
232                 .src = hdr->src,
233                 .dest = hdr->dst,
234                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
235                 .buf.length = msg_len - sizeof(*hdr)
236         };
237
238         messaging_dispatch_rec(msg_ctx, &rec);
239 }
240
241 static int messaging_context_destructor(struct messaging_context *ctx)
242 {
243         messaging_dgm_destroy();
244         return 0;
245 }
246
247 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
248                                          struct tevent_context *ev)
249 {
250         struct messaging_context *ctx;
251         NTSTATUS status;
252         int ret;
253
254         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
255                 return NULL;
256         }
257
258         ctx->id = procid_self();
259         ctx->event_ctx = ev;
260
261         sec_init();
262
263         ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
264                                  lp_cache_directory(), sec_initial_uid(),
265                                  messaging_recv_cb, ctx);
266
267         if (ret != 0) {
268                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
269                 TALLOC_FREE(ctx);
270                 return NULL;
271         }
272
273         talloc_set_destructor(ctx, messaging_context_destructor);
274
275         if (lp_clustering()) {
276                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
277
278                 if (!NT_STATUS_IS_OK(status)) {
279                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
280                                   nt_errstr(status)));
281                         TALLOC_FREE(ctx);
282                         return NULL;
283                 }
284         }
285         ctx->id.vnn = get_my_vnn();
286
287         messaging_register(ctx, NULL, MSG_PING, ping_message);
288
289         /* Register some debugging related messages */
290
291         register_msg_pool_usage(ctx);
292         register_dmalloc_msgs(ctx);
293         debug_register_msgs(ctx);
294
295         return ctx;
296 }
297
298 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
299 {
300         return msg_ctx->id;
301 }
302
303 /*
304  * re-init after a fork
305  */
306 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
307 {
308         NTSTATUS status;
309         int ret;
310
311         messaging_dgm_destroy();
312
313         msg_ctx->id = procid_self();
314
315         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
316                                  lp_cache_directory(), sec_initial_uid(),
317                                  messaging_recv_cb, msg_ctx);
318         if (ret != 0) {
319                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
320                 return map_nt_error_from_unix(ret);
321         }
322
323         TALLOC_FREE(msg_ctx->remote);
324
325         if (lp_clustering()) {
326                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
327                                               &msg_ctx->remote);
328
329                 if (!NT_STATUS_IS_OK(status)) {
330                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
331                                   nt_errstr(status)));
332                         return status;
333                 }
334         }
335
336         return NT_STATUS_OK;
337 }
338
339
340 /*
341  * Register a dispatch function for a particular message type. Allow multiple
342  * registrants
343 */
344 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
345                             void *private_data,
346                             uint32_t msg_type,
347                             void (*fn)(struct messaging_context *msg,
348                                        void *private_data, 
349                                        uint32_t msg_type, 
350                                        struct server_id server_id,
351                                        DATA_BLOB *data))
352 {
353         struct messaging_callback *cb;
354
355         DEBUG(5, ("Registering messaging pointer for type %u - "
356                   "private_data=%p\n",
357                   (unsigned)msg_type, private_data));
358
359         /*
360          * Only one callback per type
361          */
362
363         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
364                 /* we allow a second registration of the same message
365                    type if it has a different private pointer. This is
366                    needed in, for example, the internal notify code,
367                    which creates a new notify context for each tree
368                    connect, and expects to receive messages to each of
369                    them. */
370                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
371                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
372                                   (unsigned)msg_type, private_data));
373                         cb->fn = fn;
374                         cb->private_data = private_data;
375                         return NT_STATUS_OK;
376                 }
377         }
378
379         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
380                 return NT_STATUS_NO_MEMORY;
381         }
382
383         cb->msg_type = msg_type;
384         cb->fn = fn;
385         cb->private_data = private_data;
386
387         DLIST_ADD(msg_ctx->callbacks, cb);
388         return NT_STATUS_OK;
389 }
390
391 /*
392   De-register the function for a particular message type.
393 */
394 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
395                           void *private_data)
396 {
397         struct messaging_callback *cb, *next;
398
399         for (cb = ctx->callbacks; cb; cb = next) {
400                 next = cb->next;
401                 if ((cb->msg_type == msg_type)
402                     && (cb->private_data == private_data)) {
403                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
404                                   (unsigned)msg_type, private_data));
405                         DLIST_REMOVE(ctx->callbacks, cb);
406                         TALLOC_FREE(cb);
407                 }
408         }
409 }
410
411 /*
412   Send a message to a particular server
413 */
414 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
415                         struct server_id server, uint32_t msg_type,
416                         const DATA_BLOB *data)
417 {
418         struct iovec iov;
419
420         iov.iov_base = data->data;
421         iov.iov_len = data->length;
422
423         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
424 }
425
426 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
427                             struct server_id server, uint32_t msg_type,
428                             const uint8_t *buf, size_t len)
429 {
430         DATA_BLOB blob = data_blob_const(buf, len);
431         return messaging_send(msg_ctx, server, msg_type, &blob);
432 }
433
434 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
435                             struct server_id server, uint32_t msg_type,
436                             const struct iovec *iov, int iovlen)
437 {
438         int ret;
439         struct messaging_hdr hdr;
440         struct iovec iov2[iovlen+1];
441
442         if (server_id_is_disconnected(&server)) {
443                 return NT_STATUS_INVALID_PARAMETER_MIX;
444         }
445
446         if (!procid_is_local(&server)) {
447                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
448                                                msg_type, iov, iovlen,
449                                                msg_ctx->remote);
450                 if (ret != 0) {
451                         return map_nt_error_from_unix(ret);
452                 }
453                 return NT_STATUS_OK;
454         }
455
456         if (server_id_same_process(&msg_ctx->id, &server)) {
457                 struct messaging_rec rec;
458                 uint8_t *buf;
459
460                 /*
461                  * Self-send, directly dispatch
462                  */
463
464                 buf = iov_buf(talloc_tos(), iov, iovlen);
465                 if (buf == NULL) {
466                         return NT_STATUS_NO_MEMORY;
467                 }
468
469                 rec = (struct messaging_rec) {
470                         .msg_version = MESSAGE_VERSION,
471                         .msg_type = msg_type & MSG_TYPE_MASK,
472                         .dest = server,
473                         .src = msg_ctx->id,
474                         .buf = data_blob_const(buf, talloc_get_size(buf)),
475                 };
476
477                 messaging_dispatch_rec(msg_ctx, &rec);
478                 TALLOC_FREE(buf);
479                 return NT_STATUS_OK;
480         }
481
482         hdr = (struct messaging_hdr) {
483                 .msg_type = msg_type,
484                 .dst = server,
485                 .src = msg_ctx->id
486         };
487         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
488         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
489
490         become_root();
491         ret = messaging_dgm_send(server.pid, iov2, iovlen+1);
492         unbecome_root();
493
494         if (ret != 0) {
495                 return map_nt_error_from_unix(ret);
496         }
497         return NT_STATUS_OK;
498 }
499
500 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
501                                                struct messaging_rec *rec)
502 {
503         struct messaging_rec *result;
504
505         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
506                                       1, rec->buf.length);
507         if (result == NULL) {
508                 return NULL;
509         }
510         *result = *rec;
511
512         /* Doesn't fail, see talloc_pooled_object */
513
514         result->buf.data = talloc_memdup(result, rec->buf.data,
515                                          rec->buf.length);
516         return result;
517 }
518
519 struct messaging_filtered_read_state {
520         struct tevent_context *ev;
521         struct messaging_context *msg_ctx;
522         void *tevent_handle;
523
524         bool (*filter)(struct messaging_rec *rec, void *private_data);
525         void *private_data;
526
527         struct messaging_rec *rec;
528 };
529
530 static void messaging_filtered_read_cleanup(struct tevent_req *req,
531                                             enum tevent_req_state req_state);
532
533 struct tevent_req *messaging_filtered_read_send(
534         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
535         struct messaging_context *msg_ctx,
536         bool (*filter)(struct messaging_rec *rec, void *private_data),
537         void *private_data)
538 {
539         struct tevent_req *req;
540         struct messaging_filtered_read_state *state;
541         size_t new_waiters_len;
542
543         req = tevent_req_create(mem_ctx, &state,
544                                 struct messaging_filtered_read_state);
545         if (req == NULL) {
546                 return NULL;
547         }
548         state->ev = ev;
549         state->msg_ctx = msg_ctx;
550         state->filter = filter;
551         state->private_data = private_data;
552
553         /*
554          * We have to defer the callback here, as we might be called from
555          * within a different tevent_context than state->ev
556          */
557         tevent_req_defer_callback(req, state->ev);
558
559         state->tevent_handle = messaging_dgm_register_tevent_context(
560                 state, ev);
561         if (tevent_req_nomem(state, req)) {
562                 return tevent_req_post(req, ev);
563         }
564
565         /*
566          * We add ourselves to the "new_waiters" array, not the "waiters"
567          * array. If we are called from within messaging_read_done,
568          * messaging_dispatch_rec will be in an active for-loop on
569          * "waiters". We must be careful not to mess with this array, because
570          * it could mean that a single event is being delivered twice.
571          */
572
573         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
574
575         if (new_waiters_len == msg_ctx->num_new_waiters) {
576                 struct tevent_req **tmp;
577
578                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
579                                      struct tevent_req *, new_waiters_len+1);
580                 if (tevent_req_nomem(tmp, req)) {
581                         return tevent_req_post(req, ev);
582                 }
583                 msg_ctx->new_waiters = tmp;
584         }
585
586         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
587         msg_ctx->num_new_waiters += 1;
588         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
589
590         return req;
591 }
592
593 static void messaging_filtered_read_cleanup(struct tevent_req *req,
594                                             enum tevent_req_state req_state)
595 {
596         struct messaging_filtered_read_state *state = tevent_req_data(
597                 req, struct messaging_filtered_read_state);
598         struct messaging_context *msg_ctx = state->msg_ctx;
599         unsigned i;
600
601         tevent_req_set_cleanup_fn(req, NULL);
602
603         TALLOC_FREE(state->tevent_handle);
604
605         /*
606          * Just set the [new_]waiters entry to NULL, be careful not to mess
607          * with the other "waiters" array contents. We are often called from
608          * within "messaging_dispatch_rec", which loops over
609          * "waiters". Messing with the "waiters" array will mess up that
610          * for-loop.
611          */
612
613         for (i=0; i<msg_ctx->num_waiters; i++) {
614                 if (msg_ctx->waiters[i] == req) {
615                         msg_ctx->waiters[i] = NULL;
616                         return;
617                 }
618         }
619
620         for (i=0; i<msg_ctx->num_new_waiters; i++) {
621                 if (msg_ctx->new_waiters[i] == req) {
622                         msg_ctx->new_waiters[i] = NULL;
623                         return;
624                 }
625         }
626 }
627
628 static void messaging_filtered_read_done(struct tevent_req *req,
629                                          struct messaging_rec *rec)
630 {
631         struct messaging_filtered_read_state *state = tevent_req_data(
632                 req, struct messaging_filtered_read_state);
633
634         state->rec = messaging_rec_dup(state, rec);
635         if (tevent_req_nomem(state->rec, req)) {
636                 return;
637         }
638         tevent_req_done(req);
639 }
640
641 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
642                                  struct messaging_rec **presult)
643 {
644         struct messaging_filtered_read_state *state = tevent_req_data(
645                 req, struct messaging_filtered_read_state);
646         int err;
647
648         if (tevent_req_is_unix_error(req, &err)) {
649                 tevent_req_received(req);
650                 return err;
651         }
652         *presult = talloc_move(mem_ctx, &state->rec);
653         return 0;
654 }
655
656 struct messaging_read_state {
657         uint32_t msg_type;
658         struct messaging_rec *rec;
659 };
660
661 static bool messaging_read_filter(struct messaging_rec *rec,
662                                   void *private_data);
663 static void messaging_read_done(struct tevent_req *subreq);
664
665 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
666                                        struct tevent_context *ev,
667                                        struct messaging_context *msg,
668                                        uint32_t msg_type)
669 {
670         struct tevent_req *req, *subreq;
671         struct messaging_read_state *state;
672
673         req = tevent_req_create(mem_ctx, &state,
674                                 struct messaging_read_state);
675         if (req == NULL) {
676                 return NULL;
677         }
678         state->msg_type = msg_type;
679
680         subreq = messaging_filtered_read_send(state, ev, msg,
681                                               messaging_read_filter, state);
682         if (tevent_req_nomem(subreq, req)) {
683                 return tevent_req_post(req, ev);
684         }
685         tevent_req_set_callback(subreq, messaging_read_done, req);
686         return req;
687 }
688
689 static bool messaging_read_filter(struct messaging_rec *rec,
690                                   void *private_data)
691 {
692         struct messaging_read_state *state = talloc_get_type_abort(
693                 private_data, struct messaging_read_state);
694
695         return rec->msg_type == state->msg_type;
696 }
697
698 static void messaging_read_done(struct tevent_req *subreq)
699 {
700         struct tevent_req *req = tevent_req_callback_data(
701                 subreq, struct tevent_req);
702         struct messaging_read_state *state = tevent_req_data(
703                 req, struct messaging_read_state);
704         int ret;
705
706         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
707         TALLOC_FREE(subreq);
708         if (tevent_req_error(req, ret)) {
709                 return;
710         }
711         tevent_req_done(req);
712 }
713
714 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
715                         struct messaging_rec **presult)
716 {
717         struct messaging_read_state *state = tevent_req_data(
718                 req, struct messaging_read_state);
719         int err;
720
721         if (tevent_req_is_unix_error(req, &err)) {
722                 return err;
723         }
724         if (presult != NULL) {
725                 *presult = talloc_move(mem_ctx, &state->rec);
726         }
727         return 0;
728 }
729
730 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
731 {
732         if (msg_ctx->num_new_waiters == 0) {
733                 return true;
734         }
735
736         if (talloc_array_length(msg_ctx->waiters) <
737             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
738                 struct tevent_req **tmp;
739                 tmp = talloc_realloc(
740                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
741                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
742                 if (tmp == NULL) {
743                         DEBUG(1, ("%s: talloc failed\n", __func__));
744                         return false;
745                 }
746                 msg_ctx->waiters = tmp;
747         }
748
749         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
750                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
751
752         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
753         msg_ctx->num_new_waiters = 0;
754
755         return true;
756 }
757
758 struct messaging_defer_callback_state {
759         struct messaging_context *msg_ctx;
760         struct messaging_rec *rec;
761         void (*fn)(struct messaging_context *msg, void *private_data,
762                    uint32_t msg_type, struct server_id server_id,
763                    DATA_BLOB *data);
764         void *private_data;
765 };
766
767 static void messaging_defer_callback_trigger(struct tevent_context *ev,
768                                              struct tevent_immediate *im,
769                                              void *private_data);
770
771 static void messaging_defer_callback(
772         struct messaging_context *msg_ctx, struct messaging_rec *rec,
773         void (*fn)(struct messaging_context *msg, void *private_data,
774                    uint32_t msg_type, struct server_id server_id,
775                    DATA_BLOB *data),
776         void *private_data)
777 {
778         struct messaging_defer_callback_state *state;
779         struct tevent_immediate *im;
780
781         state = talloc(msg_ctx, struct messaging_defer_callback_state);
782         if (state == NULL) {
783                 DEBUG(1, ("talloc failed\n"));
784                 return;
785         }
786         state->msg_ctx = msg_ctx;
787         state->fn = fn;
788         state->private_data = private_data;
789
790         state->rec = messaging_rec_dup(state, rec);
791         if (state->rec == NULL) {
792                 DEBUG(1, ("talloc failed\n"));
793                 TALLOC_FREE(state);
794                 return;
795         }
796
797         im = tevent_create_immediate(state);
798         if (im == NULL) {
799                 DEBUG(1, ("tevent_create_immediate failed\n"));
800                 TALLOC_FREE(state);
801                 return;
802         }
803         tevent_schedule_immediate(im, msg_ctx->event_ctx,
804                                   messaging_defer_callback_trigger, state);
805 }
806
807 static void messaging_defer_callback_trigger(struct tevent_context *ev,
808                                              struct tevent_immediate *im,
809                                              void *private_data)
810 {
811         struct messaging_defer_callback_state *state = talloc_get_type_abort(
812                 private_data, struct messaging_defer_callback_state);
813         struct messaging_rec *rec = state->rec;
814
815         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
816                   &rec->buf);
817         TALLOC_FREE(state);
818 }
819
820 /*
821   Dispatch one messaging_rec
822 */
823 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
824                             struct messaging_rec *rec)
825 {
826         struct messaging_callback *cb, *next;
827         unsigned i;
828
829         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
830                 next = cb->next;
831                 if (cb->msg_type != rec->msg_type) {
832                         continue;
833                 }
834
835                 if (server_id_same_process(&rec->src, &rec->dest)) {
836                         /*
837                          * This is a self-send. We are called here from
838                          * messaging_send(), and we don't want to directly
839                          * recurse into the callback but go via a
840                          * tevent_loop_once
841                          */
842                         messaging_defer_callback(msg_ctx, rec, cb->fn,
843                                                  cb->private_data);
844                 } else {
845                         /*
846                          * This comes from a different process. we are called
847                          * from the event loop, so we should call back
848                          * directly.
849                          */
850                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
851                                rec->src, &rec->buf);
852                 }
853                 /*
854                  * we continue looking for matching messages after finding
855                  * one. This matters for subsystems like the internal notify
856                  * code which register more than one handler for the same
857                  * message type
858                  */
859         }
860
861         if (!messaging_append_new_waiters(msg_ctx)) {
862                 return;
863         }
864
865         i = 0;
866         while (i < msg_ctx->num_waiters) {
867                 struct tevent_req *req;
868                 struct messaging_filtered_read_state *state;
869
870                 req = msg_ctx->waiters[i];
871                 if (req == NULL) {
872                         /*
873                          * This got cleaned up. In the meantime,
874                          * move everything down one. We need
875                          * to keep the order of waiters, as
876                          * other code may depend on this.
877                          */
878                         if (i < msg_ctx->num_waiters - 1) {
879                                 memmove(&msg_ctx->waiters[i],
880                                         &msg_ctx->waiters[i+1],
881                                         sizeof(struct tevent_req *) *
882                                             (msg_ctx->num_waiters - i - 1));
883                         }
884                         msg_ctx->num_waiters -= 1;
885                         continue;
886                 }
887
888                 state = tevent_req_data(
889                         req, struct messaging_filtered_read_state);
890                 if (state->filter(rec, state->private_data)) {
891                         messaging_filtered_read_done(req, rec);
892                 }
893
894                 i += 1;
895         }
896 }
897
898 static int mess_parent_dgm_cleanup(void *private_data);
899 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
900
901 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
902 {
903         struct tevent_req *req;
904
905         req = background_job_send(
906                 msg, msg->event_ctx, msg, NULL, 0,
907                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
908                             60*15),
909                 mess_parent_dgm_cleanup, msg);
910         if (req == NULL) {
911                 return false;
912         }
913         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
914         return true;
915 }
916
917 static int mess_parent_dgm_cleanup(void *private_data)
918 {
919         int ret;
920
921         ret = messaging_dgm_wipe();
922         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
923                    ret ? strerror(ret) : "ok"));
924         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
925                            60*15);
926 }
927
928 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
929 {
930         struct messaging_context *msg = tevent_req_callback_data(
931                 req, struct messaging_context);
932         NTSTATUS status;
933
934         status = background_job_recv(req);
935         TALLOC_FREE(req);
936         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
937                   nt_errstr(status)));
938
939         req = background_job_send(
940                 msg, msg->event_ctx, msg, NULL, 0,
941                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
942                             60*15),
943                 mess_parent_dgm_cleanup, msg);
944         if (req == NULL) {
945                 DEBUG(1, ("background_job_send failed\n"));
946         }
947         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
948 }
949
950 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
951 {
952         int ret;
953
954         if (pid == 0) {
955                 ret = messaging_dgm_wipe();
956         } else {
957                 ret = messaging_dgm_cleanup(pid);
958         }
959
960         return ret;
961 }
962
963 struct tevent_context *messaging_tevent_context(
964         struct messaging_context *msg_ctx)
965 {
966         return msg_ctx->event_ctx;
967 }
968
969 /** @} **/