messaging3: Move [un]become_root() calls out of messaging_dgm_send()
[nivanova/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 struct messaging_context {
65         struct server_id id;
66         struct tevent_context *event_ctx;
67         struct messaging_callback *callbacks;
68
69         struct tevent_req **new_waiters;
70         unsigned num_new_waiters;
71
72         struct tevent_req **waiters;
73         unsigned num_waiters;
74
75         struct messaging_dgm_context *local;
76
77         struct messaging_backend *remote;
78
79         bool *have_context;
80 };
81
82 static int messaging_context_destructor(struct messaging_context *msg_ctx);
83
84 /****************************************************************************
85  A useful function for testing the message system.
86 ****************************************************************************/
87
88 static void ping_message(struct messaging_context *msg_ctx,
89                          void *private_data,
90                          uint32_t msg_type,
91                          struct server_id src,
92                          DATA_BLOB *data)
93 {
94         struct server_id_buf idbuf;
95
96         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
97                   server_id_str_buf(src, &idbuf), (int)data->length,
98                   data->data ? (char *)data->data : ""));
99
100         messaging_send(msg_ctx, src, MSG_PONG, data);
101 }
102
103 /****************************************************************************
104  Register/replace a dispatch function for a particular message type.
105  JRA changed Dec 13 2006. Only one message handler now permitted per type.
106  *NOTE*: Dispatch functions must be able to cope with incoming
107  messages on an *odd* byte boundary.
108 ****************************************************************************/
109
110 struct msg_all {
111         struct messaging_context *msg_ctx;
112         int msg_type;
113         uint32 msg_flag;
114         const void *buf;
115         size_t len;
116         int n_sent;
117 };
118
119 /****************************************************************************
120  Send one of the messages for the broadcast.
121 ****************************************************************************/
122
123 static int traverse_fn(struct db_record *rec, const struct server_id *id,
124                        uint32_t msg_flags, void *state)
125 {
126         struct msg_all *msg_all = (struct msg_all *)state;
127         NTSTATUS status;
128
129         /* Don't send if the receiver hasn't registered an interest. */
130
131         if((msg_flags & msg_all->msg_flag) == 0) {
132                 return 0;
133         }
134
135         /* If the msg send fails because the pid was not found (i.e. smbd died), 
136          * the msg has already been deleted from the messages.tdb.*/
137
138         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
139                                     (const uint8_t *)msg_all->buf, msg_all->len);
140
141         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
142                 struct server_id_buf idbuf;
143
144                 /*
145                  * If the pid was not found delete the entry from
146                  * serverid.tdb
147                  */
148
149                 DEBUG(2, ("pid %s doesn't exist\n",
150                           server_id_str_buf(*id, &idbuf)));
151
152                 dbwrap_record_delete(rec);
153         }
154         msg_all->n_sent++;
155         return 0;
156 }
157
158 /**
159  * Send a message to all smbd processes.
160  *
161  * It isn't very efficient, but should be OK for the sorts of
162  * applications that use it. When we need efficient broadcast we can add
163  * it.
164  *
165  * @param n_sent Set to the number of messages sent.  This should be
166  * equal to the number of processes, but be careful for races.
167  *
168  * @retval True for success.
169  **/
170 bool message_send_all(struct messaging_context *msg_ctx,
171                       int msg_type,
172                       const void *buf, size_t len,
173                       int *n_sent)
174 {
175         struct msg_all msg_all;
176
177         msg_all.msg_type = msg_type;
178         if (msg_type < 0x100) {
179                 msg_all.msg_flag = FLAG_MSG_GENERAL;
180         } else if (msg_type > 0x100 && msg_type < 0x200) {
181                 msg_all.msg_flag = FLAG_MSG_NMBD;
182         } else if (msg_type > 0x200 && msg_type < 0x300) {
183                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
184         } else if (msg_type > 0x300 && msg_type < 0x400) {
185                 msg_all.msg_flag = FLAG_MSG_SMBD;
186         } else if (msg_type > 0x400 && msg_type < 0x600) {
187                 msg_all.msg_flag = FLAG_MSG_WINBIND;
188         } else if (msg_type > 4000 && msg_type < 5000) {
189                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
190         } else {
191                 return false;
192         }
193
194         msg_all.buf = buf;
195         msg_all.len = len;
196         msg_all.n_sent = 0;
197         msg_all.msg_ctx = msg_ctx;
198
199         serverid_traverse(traverse_fn, &msg_all);
200         if (n_sent)
201                 *n_sent = msg_all.n_sent;
202         return true;
203 }
204
205 static void messaging_recv_cb(int msg_type,
206                               struct server_id src, struct server_id dst,
207                               const uint8_t *msg, size_t msg_len,
208                               void *private_data)
209 {
210         struct messaging_context *msg_ctx = talloc_get_type_abort(
211                 private_data, struct messaging_context);
212         struct messaging_rec rec;
213
214         rec = (struct messaging_rec) {
215                 .msg_version = MESSAGE_VERSION,
216                 .msg_type = msg_type,
217                 .src = src,
218                 .dest = dst,
219                 .buf.data = discard_const_p(uint8, msg),
220                 .buf.length = msg_len
221         };
222
223         messaging_dispatch_rec(msg_ctx, &rec);
224 }
225
226 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
227                                          struct tevent_context *ev)
228 {
229         struct messaging_context *ctx;
230         NTSTATUS status;
231         int ret;
232         static bool have_context = false;
233
234         if (have_context) {
235                 DEBUG(0, ("No two messaging contexts per process\n"));
236                 return NULL;
237         }
238
239
240         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
241                 return NULL;
242         }
243
244         ctx->id = procid_self();
245         ctx->event_ctx = ev;
246         ctx->have_context = &have_context;
247
248         sec_init();
249
250         ret = messaging_dgm_init(ctx, ctx->event_ctx, ctx->id,
251                                  messaging_recv_cb, ctx, &ctx->local);
252
253         if (ret != 0) {
254                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
255                 TALLOC_FREE(ctx);
256                 return NULL;
257         }
258
259         if (lp_clustering()) {
260                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
261
262                 if (!NT_STATUS_IS_OK(status)) {
263                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
264                                   nt_errstr(status)));
265                         TALLOC_FREE(ctx);
266                         return NULL;
267                 }
268         }
269         ctx->id.vnn = get_my_vnn();
270
271         messaging_register(ctx, NULL, MSG_PING, ping_message);
272
273         /* Register some debugging related messages */
274
275         register_msg_pool_usage(ctx);
276         register_dmalloc_msgs(ctx);
277         debug_register_msgs(ctx);
278
279         have_context = true;
280         talloc_set_destructor(ctx, messaging_context_destructor);
281
282         return ctx;
283 }
284
285 static int messaging_context_destructor(struct messaging_context *msg_ctx)
286 {
287         SMB_ASSERT(*msg_ctx->have_context);
288         *msg_ctx->have_context = false;
289         return 0;
290 }
291
292 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
293 {
294         return msg_ctx->id;
295 }
296
297 /*
298  * re-init after a fork
299  */
300 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
301 {
302         NTSTATUS status;
303         int ret;
304
305         TALLOC_FREE(msg_ctx->local);
306
307         msg_ctx->id = procid_self();
308
309         ret = messaging_dgm_init(msg_ctx, msg_ctx->event_ctx,
310                                  msg_ctx->id, messaging_recv_cb, msg_ctx,
311                                  &msg_ctx->local);
312         if (ret != 0) {
313                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
314                 return map_nt_error_from_unix(ret);
315         }
316
317         TALLOC_FREE(msg_ctx->remote);
318
319         if (lp_clustering()) {
320                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
321                                               &msg_ctx->remote);
322
323                 if (!NT_STATUS_IS_OK(status)) {
324                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
325                                   nt_errstr(status)));
326                         return status;
327                 }
328         }
329
330         return NT_STATUS_OK;
331 }
332
333
334 /*
335  * Register a dispatch function for a particular message type. Allow multiple
336  * registrants
337 */
338 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
339                             void *private_data,
340                             uint32_t msg_type,
341                             void (*fn)(struct messaging_context *msg,
342                                        void *private_data, 
343                                        uint32_t msg_type, 
344                                        struct server_id server_id,
345                                        DATA_BLOB *data))
346 {
347         struct messaging_callback *cb;
348
349         DEBUG(5, ("Registering messaging pointer for type %u - "
350                   "private_data=%p\n",
351                   (unsigned)msg_type, private_data));
352
353         /*
354          * Only one callback per type
355          */
356
357         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
358                 /* we allow a second registration of the same message
359                    type if it has a different private pointer. This is
360                    needed in, for example, the internal notify code,
361                    which creates a new notify context for each tree
362                    connect, and expects to receive messages to each of
363                    them. */
364                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
365                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
366                                   (unsigned)msg_type, private_data));
367                         cb->fn = fn;
368                         cb->private_data = private_data;
369                         return NT_STATUS_OK;
370                 }
371         }
372
373         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
374                 return NT_STATUS_NO_MEMORY;
375         }
376
377         cb->msg_type = msg_type;
378         cb->fn = fn;
379         cb->private_data = private_data;
380
381         DLIST_ADD(msg_ctx->callbacks, cb);
382         return NT_STATUS_OK;
383 }
384
385 /*
386   De-register the function for a particular message type.
387 */
388 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
389                           void *private_data)
390 {
391         struct messaging_callback *cb, *next;
392
393         for (cb = ctx->callbacks; cb; cb = next) {
394                 next = cb->next;
395                 if ((cb->msg_type == msg_type)
396                     && (cb->private_data == private_data)) {
397                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
398                                   (unsigned)msg_type, private_data));
399                         DLIST_REMOVE(ctx->callbacks, cb);
400                         TALLOC_FREE(cb);
401                 }
402         }
403 }
404
405 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
406                                    const struct server_id *dst)
407 {
408         return ((msg_ctx->id.vnn == dst->vnn) &&
409                 (msg_ctx->id.pid == dst->pid));
410 }
411
412 /*
413   Send a message to a particular server
414 */
415 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
416                         struct server_id server, uint32_t msg_type,
417                         const DATA_BLOB *data)
418 {
419         struct iovec iov;
420
421         iov.iov_base = data->data;
422         iov.iov_len = data->length;
423
424         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
425 }
426
427 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
428                             struct server_id server, uint32_t msg_type,
429                             const uint8_t *buf, size_t len)
430 {
431         DATA_BLOB blob = data_blob_const(buf, len);
432         return messaging_send(msg_ctx, server, msg_type, &blob);
433 }
434
435 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
436                             struct server_id server, uint32_t msg_type,
437                             const struct iovec *iov, int iovlen)
438 {
439         int ret;
440
441         if (server_id_is_disconnected(&server)) {
442                 return NT_STATUS_INVALID_PARAMETER_MIX;
443         }
444
445         if (!procid_is_local(&server)) {
446                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
447                                                msg_type, iov, iovlen,
448                                                msg_ctx->remote);
449                 if (ret != 0) {
450                         return map_nt_error_from_unix(ret);
451                 }
452                 return NT_STATUS_OK;
453         }
454
455         if (messaging_is_self_send(msg_ctx, &server)) {
456                 struct messaging_rec rec;
457                 uint8_t *buf;
458
459                 buf = iov_buf(talloc_tos(), iov, iovlen);
460                 if (buf == NULL) {
461                         return NT_STATUS_NO_MEMORY;
462                 }
463
464                 rec.msg_version = MESSAGE_VERSION;
465                 rec.msg_type = msg_type & MSG_TYPE_MASK;
466                 rec.dest = server;
467                 rec.src = msg_ctx->id;
468                 rec.buf = data_blob_const(buf, talloc_get_size(buf));
469                 messaging_dispatch_rec(msg_ctx, &rec);
470                 TALLOC_FREE(buf);
471                 return NT_STATUS_OK;
472         }
473
474         become_root();
475         ret = messaging_dgm_send(msg_ctx->local, msg_ctx->id, server, msg_type,
476                                  iov, iovlen);
477         unbecome_root();
478
479         if (ret != 0) {
480                 return map_nt_error_from_unix(ret);
481         }
482         return NT_STATUS_OK;
483 }
484
485 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
486                                                struct messaging_rec *rec)
487 {
488         struct messaging_rec *result;
489
490         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
491                                       1, rec->buf.length);
492         if (result == NULL) {
493                 return NULL;
494         }
495         *result = *rec;
496
497         /* Doesn't fail, see talloc_pooled_object */
498
499         result->buf.data = talloc_memdup(result, rec->buf.data,
500                                          rec->buf.length);
501         return result;
502 }
503
504 struct messaging_filtered_read_state {
505         struct tevent_context *ev;
506         struct messaging_context *msg_ctx;
507         void *tevent_handle;
508
509         bool (*filter)(struct messaging_rec *rec, void *private_data);
510         void *private_data;
511
512         struct messaging_rec *rec;
513 };
514
515 static void messaging_filtered_read_cleanup(struct tevent_req *req,
516                                             enum tevent_req_state req_state);
517
518 struct tevent_req *messaging_filtered_read_send(
519         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
520         struct messaging_context *msg_ctx,
521         bool (*filter)(struct messaging_rec *rec, void *private_data),
522         void *private_data)
523 {
524         struct tevent_req *req;
525         struct messaging_filtered_read_state *state;
526         size_t new_waiters_len;
527
528         req = tevent_req_create(mem_ctx, &state,
529                                 struct messaging_filtered_read_state);
530         if (req == NULL) {
531                 return NULL;
532         }
533         state->ev = ev;
534         state->msg_ctx = msg_ctx;
535         state->filter = filter;
536         state->private_data = private_data;
537
538         /*
539          * We have to defer the callback here, as we might be called from
540          * within a different tevent_context than state->ev
541          */
542         tevent_req_defer_callback(req, state->ev);
543
544         state->tevent_handle = messaging_dgm_register_tevent_context(
545                 state, msg_ctx->local, ev);
546         if (tevent_req_nomem(state, req)) {
547                 return tevent_req_post(req, ev);
548         }
549
550         /*
551          * We add ourselves to the "new_waiters" array, not the "waiters"
552          * array. If we are called from within messaging_read_done,
553          * messaging_dispatch_rec will be in an active for-loop on
554          * "waiters". We must be careful not to mess with this array, because
555          * it could mean that a single event is being delivered twice.
556          */
557
558         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
559
560         if (new_waiters_len == msg_ctx->num_new_waiters) {
561                 struct tevent_req **tmp;
562
563                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
564                                      struct tevent_req *, new_waiters_len+1);
565                 if (tevent_req_nomem(tmp, req)) {
566                         return tevent_req_post(req, ev);
567                 }
568                 msg_ctx->new_waiters = tmp;
569         }
570
571         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
572         msg_ctx->num_new_waiters += 1;
573         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
574
575         return req;
576 }
577
578 static void messaging_filtered_read_cleanup(struct tevent_req *req,
579                                             enum tevent_req_state req_state)
580 {
581         struct messaging_filtered_read_state *state = tevent_req_data(
582                 req, struct messaging_filtered_read_state);
583         struct messaging_context *msg_ctx = state->msg_ctx;
584         unsigned i;
585
586         tevent_req_set_cleanup_fn(req, NULL);
587
588         TALLOC_FREE(state->tevent_handle);
589
590         /*
591          * Just set the [new_]waiters entry to NULL, be careful not to mess
592          * with the other "waiters" array contents. We are often called from
593          * within "messaging_dispatch_rec", which loops over
594          * "waiters". Messing with the "waiters" array will mess up that
595          * for-loop.
596          */
597
598         for (i=0; i<msg_ctx->num_waiters; i++) {
599                 if (msg_ctx->waiters[i] == req) {
600                         msg_ctx->waiters[i] = NULL;
601                         return;
602                 }
603         }
604
605         for (i=0; i<msg_ctx->num_new_waiters; i++) {
606                 if (msg_ctx->new_waiters[i] == req) {
607                         msg_ctx->new_waiters[i] = NULL;
608                         return;
609                 }
610         }
611 }
612
613 static void messaging_filtered_read_done(struct tevent_req *req,
614                                          struct messaging_rec *rec)
615 {
616         struct messaging_filtered_read_state *state = tevent_req_data(
617                 req, struct messaging_filtered_read_state);
618
619         state->rec = messaging_rec_dup(state, rec);
620         if (tevent_req_nomem(state->rec, req)) {
621                 return;
622         }
623         tevent_req_done(req);
624 }
625
626 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
627                                  struct messaging_rec **presult)
628 {
629         struct messaging_filtered_read_state *state = tevent_req_data(
630                 req, struct messaging_filtered_read_state);
631         int err;
632
633         if (tevent_req_is_unix_error(req, &err)) {
634                 tevent_req_received(req);
635                 return err;
636         }
637         *presult = talloc_move(mem_ctx, &state->rec);
638         return 0;
639 }
640
641 struct messaging_read_state {
642         uint32_t msg_type;
643         struct messaging_rec *rec;
644 };
645
646 static bool messaging_read_filter(struct messaging_rec *rec,
647                                   void *private_data);
648 static void messaging_read_done(struct tevent_req *subreq);
649
650 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
651                                        struct tevent_context *ev,
652                                        struct messaging_context *msg,
653                                        uint32_t msg_type)
654 {
655         struct tevent_req *req, *subreq;
656         struct messaging_read_state *state;
657
658         req = tevent_req_create(mem_ctx, &state,
659                                 struct messaging_read_state);
660         if (req == NULL) {
661                 return NULL;
662         }
663         state->msg_type = msg_type;
664
665         subreq = messaging_filtered_read_send(state, ev, msg,
666                                               messaging_read_filter, state);
667         if (tevent_req_nomem(subreq, req)) {
668                 return tevent_req_post(req, ev);
669         }
670         tevent_req_set_callback(subreq, messaging_read_done, req);
671         return req;
672 }
673
674 static bool messaging_read_filter(struct messaging_rec *rec,
675                                   void *private_data)
676 {
677         struct messaging_read_state *state = talloc_get_type_abort(
678                 private_data, struct messaging_read_state);
679
680         return rec->msg_type == state->msg_type;
681 }
682
683 static void messaging_read_done(struct tevent_req *subreq)
684 {
685         struct tevent_req *req = tevent_req_callback_data(
686                 subreq, struct tevent_req);
687         struct messaging_read_state *state = tevent_req_data(
688                 req, struct messaging_read_state);
689         int ret;
690
691         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
692         TALLOC_FREE(subreq);
693         if (tevent_req_error(req, ret)) {
694                 return;
695         }
696         tevent_req_done(req);
697 }
698
699 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
700                         struct messaging_rec **presult)
701 {
702         struct messaging_read_state *state = tevent_req_data(
703                 req, struct messaging_read_state);
704         int err;
705
706         if (tevent_req_is_unix_error(req, &err)) {
707                 return err;
708         }
709         if (presult != NULL) {
710                 *presult = talloc_move(mem_ctx, &state->rec);
711         }
712         return 0;
713 }
714
715 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
716 {
717         if (msg_ctx->num_new_waiters == 0) {
718                 return true;
719         }
720
721         if (talloc_array_length(msg_ctx->waiters) <
722             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
723                 struct tevent_req **tmp;
724                 tmp = talloc_realloc(
725                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
726                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
727                 if (tmp == NULL) {
728                         DEBUG(1, ("%s: talloc failed\n", __func__));
729                         return false;
730                 }
731                 msg_ctx->waiters = tmp;
732         }
733
734         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
735                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
736
737         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
738         msg_ctx->num_new_waiters = 0;
739
740         return true;
741 }
742
743 struct messaging_defer_callback_state {
744         struct messaging_context *msg_ctx;
745         struct messaging_rec *rec;
746         void (*fn)(struct messaging_context *msg, void *private_data,
747                    uint32_t msg_type, struct server_id server_id,
748                    DATA_BLOB *data);
749         void *private_data;
750 };
751
752 static void messaging_defer_callback_trigger(struct tevent_context *ev,
753                                              struct tevent_immediate *im,
754                                              void *private_data);
755
756 static void messaging_defer_callback(
757         struct messaging_context *msg_ctx, struct messaging_rec *rec,
758         void (*fn)(struct messaging_context *msg, void *private_data,
759                    uint32_t msg_type, struct server_id server_id,
760                    DATA_BLOB *data),
761         void *private_data)
762 {
763         struct messaging_defer_callback_state *state;
764         struct tevent_immediate *im;
765
766         state = talloc(msg_ctx, struct messaging_defer_callback_state);
767         if (state == NULL) {
768                 DEBUG(1, ("talloc failed\n"));
769                 return;
770         }
771         state->msg_ctx = msg_ctx;
772         state->fn = fn;
773         state->private_data = private_data;
774
775         state->rec = messaging_rec_dup(state, rec);
776         if (state->rec == NULL) {
777                 DEBUG(1, ("talloc failed\n"));
778                 TALLOC_FREE(state);
779                 return;
780         }
781
782         im = tevent_create_immediate(state);
783         if (im == NULL) {
784                 DEBUG(1, ("tevent_create_immediate failed\n"));
785                 TALLOC_FREE(state);
786                 return;
787         }
788         tevent_schedule_immediate(im, msg_ctx->event_ctx,
789                                   messaging_defer_callback_trigger, state);
790 }
791
792 static void messaging_defer_callback_trigger(struct tevent_context *ev,
793                                              struct tevent_immediate *im,
794                                              void *private_data)
795 {
796         struct messaging_defer_callback_state *state = talloc_get_type_abort(
797                 private_data, struct messaging_defer_callback_state);
798         struct messaging_rec *rec = state->rec;
799
800         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
801                   &rec->buf);
802         TALLOC_FREE(state);
803 }
804
805 /*
806   Dispatch one messaging_rec
807 */
808 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
809                             struct messaging_rec *rec)
810 {
811         struct messaging_callback *cb, *next;
812         unsigned i;
813
814         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
815                 next = cb->next;
816                 if (cb->msg_type != rec->msg_type) {
817                         continue;
818                 }
819
820                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
821                         /*
822                          * This is a self-send. We are called here from
823                          * messaging_send(), and we don't want to directly
824                          * recurse into the callback but go via a
825                          * tevent_loop_once
826                          */
827                         messaging_defer_callback(msg_ctx, rec, cb->fn,
828                                                  cb->private_data);
829                 } else {
830                         /*
831                          * This comes from a different process. we are called
832                          * from the event loop, so we should call back
833                          * directly.
834                          */
835                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
836                                rec->src, &rec->buf);
837                 }
838                 /*
839                  * we continue looking for matching messages after finding
840                  * one. This matters for subsystems like the internal notify
841                  * code which register more than one handler for the same
842                  * message type
843                  */
844         }
845
846         if (!messaging_append_new_waiters(msg_ctx)) {
847                 return;
848         }
849
850         i = 0;
851         while (i < msg_ctx->num_waiters) {
852                 struct tevent_req *req;
853                 struct messaging_filtered_read_state *state;
854
855                 req = msg_ctx->waiters[i];
856                 if (req == NULL) {
857                         /*
858                          * This got cleaned up. In the meantime,
859                          * move everything down one. We need
860                          * to keep the order of waiters, as
861                          * other code may depend on this.
862                          */
863                         if (i < msg_ctx->num_waiters - 1) {
864                                 memmove(&msg_ctx->waiters[i],
865                                         &msg_ctx->waiters[i+1],
866                                         sizeof(struct tevent_req *) *
867                                             (msg_ctx->num_waiters - i - 1));
868                         }
869                         msg_ctx->num_waiters -= 1;
870                         continue;
871                 }
872
873                 state = tevent_req_data(
874                         req, struct messaging_filtered_read_state);
875                 if (state->filter(rec, state->private_data)) {
876                         messaging_filtered_read_done(req, rec);
877                 }
878
879                 i += 1;
880         }
881 }
882
883 static int mess_parent_dgm_cleanup(void *private_data);
884 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
885
886 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
887 {
888         struct tevent_req *req;
889
890         req = background_job_send(
891                 msg, msg->event_ctx, msg, NULL, 0,
892                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
893                             60*15),
894                 mess_parent_dgm_cleanup, msg);
895         if (req == NULL) {
896                 return false;
897         }
898         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
899         return true;
900 }
901
902 static int mess_parent_dgm_cleanup(void *private_data)
903 {
904         struct messaging_context *msg_ctx = talloc_get_type_abort(
905                 private_data, struct messaging_context);
906         int ret;
907
908         ret = messaging_dgm_wipe(msg_ctx->local);
909         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
910                    ret ? strerror(ret) : "ok"));
911         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
912                            60*15);
913 }
914
915 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
916 {
917         struct messaging_context *msg = tevent_req_callback_data(
918                 req, struct messaging_context);
919         NTSTATUS status;
920
921         status = background_job_recv(req);
922         TALLOC_FREE(req);
923         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
924                   nt_errstr(status)));
925
926         req = background_job_send(
927                 msg, msg->event_ctx, msg, NULL, 0,
928                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
929                             60*15),
930                 mess_parent_dgm_cleanup, msg);
931         if (req == NULL) {
932                 DEBUG(1, ("background_job_send failed\n"));
933         }
934         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
935 }
936
937 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
938 {
939         int ret;
940
941         if (pid == 0) {
942                 ret = messaging_dgm_wipe(msg_ctx->local);
943         } else {
944                 ret = messaging_dgm_cleanup(msg_ctx->local, pid);
945         }
946
947         return ret;
948 }
949
950 struct tevent_context *messaging_tevent_context(
951         struct messaging_context *msg_ctx)
952 {
953         return msg_ctx->event_ctx;
954 }
955
956 /** @} **/