messaging3: Directly refer to messaging_dgm in messages.c
[nivanova/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 struct messaging_context {
65         struct server_id id;
66         struct tevent_context *event_ctx;
67         struct messaging_callback *callbacks;
68
69         struct tevent_req **new_waiters;
70         unsigned num_new_waiters;
71
72         struct tevent_req **waiters;
73         unsigned num_waiters;
74
75         struct messaging_dgm_context *local;
76
77         struct messaging_backend *remote;
78
79         bool *have_context;
80 };
81
82 static int messaging_context_destructor(struct messaging_context *msg_ctx);
83
84 /****************************************************************************
85  A useful function for testing the message system.
86 ****************************************************************************/
87
88 static void ping_message(struct messaging_context *msg_ctx,
89                          void *private_data,
90                          uint32_t msg_type,
91                          struct server_id src,
92                          DATA_BLOB *data)
93 {
94         struct server_id_buf idbuf;
95
96         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
97                   server_id_str_buf(src, &idbuf), (int)data->length,
98                   data->data ? (char *)data->data : ""));
99
100         messaging_send(msg_ctx, src, MSG_PONG, data);
101 }
102
103 /****************************************************************************
104  Register/replace a dispatch function for a particular message type.
105  JRA changed Dec 13 2006. Only one message handler now permitted per type.
106  *NOTE*: Dispatch functions must be able to cope with incoming
107  messages on an *odd* byte boundary.
108 ****************************************************************************/
109
110 struct msg_all {
111         struct messaging_context *msg_ctx;
112         int msg_type;
113         uint32 msg_flag;
114         const void *buf;
115         size_t len;
116         int n_sent;
117 };
118
119 /****************************************************************************
120  Send one of the messages for the broadcast.
121 ****************************************************************************/
122
123 static int traverse_fn(struct db_record *rec, const struct server_id *id,
124                        uint32_t msg_flags, void *state)
125 {
126         struct msg_all *msg_all = (struct msg_all *)state;
127         NTSTATUS status;
128
129         /* Don't send if the receiver hasn't registered an interest. */
130
131         if((msg_flags & msg_all->msg_flag) == 0) {
132                 return 0;
133         }
134
135         /* If the msg send fails because the pid was not found (i.e. smbd died), 
136          * the msg has already been deleted from the messages.tdb.*/
137
138         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
139                                     (const uint8_t *)msg_all->buf, msg_all->len);
140
141         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
142                 struct server_id_buf idbuf;
143
144                 /*
145                  * If the pid was not found delete the entry from
146                  * serverid.tdb
147                  */
148
149                 DEBUG(2, ("pid %s doesn't exist\n",
150                           server_id_str_buf(*id, &idbuf)));
151
152                 dbwrap_record_delete(rec);
153         }
154         msg_all->n_sent++;
155         return 0;
156 }
157
158 /**
159  * Send a message to all smbd processes.
160  *
161  * It isn't very efficient, but should be OK for the sorts of
162  * applications that use it. When we need efficient broadcast we can add
163  * it.
164  *
165  * @param n_sent Set to the number of messages sent.  This should be
166  * equal to the number of processes, but be careful for races.
167  *
168  * @retval True for success.
169  **/
170 bool message_send_all(struct messaging_context *msg_ctx,
171                       int msg_type,
172                       const void *buf, size_t len,
173                       int *n_sent)
174 {
175         struct msg_all msg_all;
176
177         msg_all.msg_type = msg_type;
178         if (msg_type < 0x100) {
179                 msg_all.msg_flag = FLAG_MSG_GENERAL;
180         } else if (msg_type > 0x100 && msg_type < 0x200) {
181                 msg_all.msg_flag = FLAG_MSG_NMBD;
182         } else if (msg_type > 0x200 && msg_type < 0x300) {
183                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
184         } else if (msg_type > 0x300 && msg_type < 0x400) {
185                 msg_all.msg_flag = FLAG_MSG_SMBD;
186         } else if (msg_type > 0x400 && msg_type < 0x600) {
187                 msg_all.msg_flag = FLAG_MSG_WINBIND;
188         } else if (msg_type > 4000 && msg_type < 5000) {
189                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
190         } else {
191                 return false;
192         }
193
194         msg_all.buf = buf;
195         msg_all.len = len;
196         msg_all.n_sent = 0;
197         msg_all.msg_ctx = msg_ctx;
198
199         serverid_traverse(traverse_fn, &msg_all);
200         if (n_sent)
201                 *n_sent = msg_all.n_sent;
202         return true;
203 }
204
205 static void messaging_recv_cb(int msg_type,
206                               struct server_id src, struct server_id dst,
207                               const uint8_t *msg, size_t msg_len,
208                               void *private_data)
209 {
210         struct messaging_context *msg_ctx = talloc_get_type_abort(
211                 private_data, struct messaging_context);
212         struct messaging_rec rec;
213
214         rec = (struct messaging_rec) {
215                 .msg_version = MESSAGE_VERSION,
216                 .msg_type = msg_type,
217                 .src = src,
218                 .dest = dst,
219                 .buf.data = discard_const_p(uint8, msg),
220                 .buf.length = msg_len
221         };
222
223         messaging_dispatch_rec(msg_ctx, &rec);
224 }
225
226 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
227                                          struct tevent_context *ev)
228 {
229         struct messaging_context *ctx;
230         NTSTATUS status;
231         int ret;
232         static bool have_context = false;
233
234         if (have_context) {
235                 DEBUG(0, ("No two messaging contexts per process\n"));
236                 return NULL;
237         }
238
239
240         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
241                 return NULL;
242         }
243
244         ctx->id = procid_self();
245         ctx->event_ctx = ev;
246         ctx->have_context = &have_context;
247
248         ret = messaging_dgm_init(ctx, ctx->event_ctx, ctx->id,
249                                  messaging_recv_cb, ctx, &ctx->local);
250
251         if (ret != 0) {
252                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
253                 TALLOC_FREE(ctx);
254                 return NULL;
255         }
256
257         if (lp_clustering()) {
258                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
259
260                 if (!NT_STATUS_IS_OK(status)) {
261                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
262                                   nt_errstr(status)));
263                         TALLOC_FREE(ctx);
264                         return NULL;
265                 }
266         }
267         ctx->id.vnn = get_my_vnn();
268
269         messaging_register(ctx, NULL, MSG_PING, ping_message);
270
271         /* Register some debugging related messages */
272
273         register_msg_pool_usage(ctx);
274         register_dmalloc_msgs(ctx);
275         debug_register_msgs(ctx);
276
277         have_context = true;
278         talloc_set_destructor(ctx, messaging_context_destructor);
279
280         return ctx;
281 }
282
283 static int messaging_context_destructor(struct messaging_context *msg_ctx)
284 {
285         SMB_ASSERT(*msg_ctx->have_context);
286         *msg_ctx->have_context = false;
287         return 0;
288 }
289
290 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
291 {
292         return msg_ctx->id;
293 }
294
295 /*
296  * re-init after a fork
297  */
298 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
299 {
300         NTSTATUS status;
301         int ret;
302
303         TALLOC_FREE(msg_ctx->local);
304
305         msg_ctx->id = procid_self();
306
307         ret = messaging_dgm_init(msg_ctx, msg_ctx->event_ctx,
308                                  msg_ctx->id, messaging_recv_cb, msg_ctx,
309                                  &msg_ctx->local);
310         if (ret != 0) {
311                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
312                 return map_nt_error_from_unix(ret);
313         }
314
315         TALLOC_FREE(msg_ctx->remote);
316
317         if (lp_clustering()) {
318                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
319                                               &msg_ctx->remote);
320
321                 if (!NT_STATUS_IS_OK(status)) {
322                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
323                                   nt_errstr(status)));
324                         return status;
325                 }
326         }
327
328         return NT_STATUS_OK;
329 }
330
331
332 /*
333  * Register a dispatch function for a particular message type. Allow multiple
334  * registrants
335 */
336 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
337                             void *private_data,
338                             uint32_t msg_type,
339                             void (*fn)(struct messaging_context *msg,
340                                        void *private_data, 
341                                        uint32_t msg_type, 
342                                        struct server_id server_id,
343                                        DATA_BLOB *data))
344 {
345         struct messaging_callback *cb;
346
347         DEBUG(5, ("Registering messaging pointer for type %u - "
348                   "private_data=%p\n",
349                   (unsigned)msg_type, private_data));
350
351         /*
352          * Only one callback per type
353          */
354
355         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
356                 /* we allow a second registration of the same message
357                    type if it has a different private pointer. This is
358                    needed in, for example, the internal notify code,
359                    which creates a new notify context for each tree
360                    connect, and expects to receive messages to each of
361                    them. */
362                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
363                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
364                                   (unsigned)msg_type, private_data));
365                         cb->fn = fn;
366                         cb->private_data = private_data;
367                         return NT_STATUS_OK;
368                 }
369         }
370
371         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
372                 return NT_STATUS_NO_MEMORY;
373         }
374
375         cb->msg_type = msg_type;
376         cb->fn = fn;
377         cb->private_data = private_data;
378
379         DLIST_ADD(msg_ctx->callbacks, cb);
380         return NT_STATUS_OK;
381 }
382
383 /*
384   De-register the function for a particular message type.
385 */
386 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
387                           void *private_data)
388 {
389         struct messaging_callback *cb, *next;
390
391         for (cb = ctx->callbacks; cb; cb = next) {
392                 next = cb->next;
393                 if ((cb->msg_type == msg_type)
394                     && (cb->private_data == private_data)) {
395                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
396                                   (unsigned)msg_type, private_data));
397                         DLIST_REMOVE(ctx->callbacks, cb);
398                         TALLOC_FREE(cb);
399                 }
400         }
401 }
402
403 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
404                                    const struct server_id *dst)
405 {
406         return ((msg_ctx->id.vnn == dst->vnn) &&
407                 (msg_ctx->id.pid == dst->pid));
408 }
409
410 /*
411   Send a message to a particular server
412 */
413 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
414                         struct server_id server, uint32_t msg_type,
415                         const DATA_BLOB *data)
416 {
417         struct iovec iov;
418
419         iov.iov_base = data->data;
420         iov.iov_len = data->length;
421
422         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
423 }
424
425 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
426                             struct server_id server, uint32_t msg_type,
427                             const uint8_t *buf, size_t len)
428 {
429         DATA_BLOB blob = data_blob_const(buf, len);
430         return messaging_send(msg_ctx, server, msg_type, &blob);
431 }
432
433 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
434                             struct server_id server, uint32_t msg_type,
435                             const struct iovec *iov, int iovlen)
436 {
437         int ret;
438
439         if (server_id_is_disconnected(&server)) {
440                 return NT_STATUS_INVALID_PARAMETER_MIX;
441         }
442
443         if (!procid_is_local(&server)) {
444                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
445                                                msg_type, iov, iovlen,
446                                                msg_ctx->remote);
447                 if (ret != 0) {
448                         return map_nt_error_from_unix(ret);
449                 }
450                 return NT_STATUS_OK;
451         }
452
453         if (messaging_is_self_send(msg_ctx, &server)) {
454                 struct messaging_rec rec;
455                 uint8_t *buf;
456
457                 buf = iov_buf(talloc_tos(), iov, iovlen);
458                 if (buf == NULL) {
459                         return NT_STATUS_NO_MEMORY;
460                 }
461
462                 rec.msg_version = MESSAGE_VERSION;
463                 rec.msg_type = msg_type & MSG_TYPE_MASK;
464                 rec.dest = server;
465                 rec.src = msg_ctx->id;
466                 rec.buf = data_blob_const(buf, talloc_get_size(buf));
467                 messaging_dispatch_rec(msg_ctx, &rec);
468                 TALLOC_FREE(buf);
469                 return NT_STATUS_OK;
470         }
471
472         ret = messaging_dgm_send(msg_ctx->local, msg_ctx->id, server, msg_type,
473                                  iov, iovlen);
474         if (ret != 0) {
475                 return map_nt_error_from_unix(ret);
476         }
477         return NT_STATUS_OK;
478 }
479
480 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
481                                                struct messaging_rec *rec)
482 {
483         struct messaging_rec *result;
484
485         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
486                                       1, rec->buf.length);
487         if (result == NULL) {
488                 return NULL;
489         }
490         *result = *rec;
491
492         /* Doesn't fail, see talloc_pooled_object */
493
494         result->buf.data = talloc_memdup(result, rec->buf.data,
495                                          rec->buf.length);
496         return result;
497 }
498
499 struct messaging_filtered_read_state {
500         struct tevent_context *ev;
501         struct messaging_context *msg_ctx;
502         void *tevent_handle;
503
504         bool (*filter)(struct messaging_rec *rec, void *private_data);
505         void *private_data;
506
507         struct messaging_rec *rec;
508 };
509
510 static void messaging_filtered_read_cleanup(struct tevent_req *req,
511                                             enum tevent_req_state req_state);
512
513 struct tevent_req *messaging_filtered_read_send(
514         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
515         struct messaging_context *msg_ctx,
516         bool (*filter)(struct messaging_rec *rec, void *private_data),
517         void *private_data)
518 {
519         struct tevent_req *req;
520         struct messaging_filtered_read_state *state;
521         size_t new_waiters_len;
522
523         req = tevent_req_create(mem_ctx, &state,
524                                 struct messaging_filtered_read_state);
525         if (req == NULL) {
526                 return NULL;
527         }
528         state->ev = ev;
529         state->msg_ctx = msg_ctx;
530         state->filter = filter;
531         state->private_data = private_data;
532
533         /*
534          * We have to defer the callback here, as we might be called from
535          * within a different tevent_context than state->ev
536          */
537         tevent_req_defer_callback(req, state->ev);
538
539         state->tevent_handle = messaging_dgm_register_tevent_context(
540                 state, msg_ctx->local, ev);
541         if (tevent_req_nomem(state, req)) {
542                 return tevent_req_post(req, ev);
543         }
544
545         /*
546          * We add ourselves to the "new_waiters" array, not the "waiters"
547          * array. If we are called from within messaging_read_done,
548          * messaging_dispatch_rec will be in an active for-loop on
549          * "waiters". We must be careful not to mess with this array, because
550          * it could mean that a single event is being delivered twice.
551          */
552
553         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
554
555         if (new_waiters_len == msg_ctx->num_new_waiters) {
556                 struct tevent_req **tmp;
557
558                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
559                                      struct tevent_req *, new_waiters_len+1);
560                 if (tevent_req_nomem(tmp, req)) {
561                         return tevent_req_post(req, ev);
562                 }
563                 msg_ctx->new_waiters = tmp;
564         }
565
566         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
567         msg_ctx->num_new_waiters += 1;
568         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
569
570         return req;
571 }
572
573 static void messaging_filtered_read_cleanup(struct tevent_req *req,
574                                             enum tevent_req_state req_state)
575 {
576         struct messaging_filtered_read_state *state = tevent_req_data(
577                 req, struct messaging_filtered_read_state);
578         struct messaging_context *msg_ctx = state->msg_ctx;
579         unsigned i;
580
581         tevent_req_set_cleanup_fn(req, NULL);
582
583         TALLOC_FREE(state->tevent_handle);
584
585         /*
586          * Just set the [new_]waiters entry to NULL, be careful not to mess
587          * with the other "waiters" array contents. We are often called from
588          * within "messaging_dispatch_rec", which loops over
589          * "waiters". Messing with the "waiters" array will mess up that
590          * for-loop.
591          */
592
593         for (i=0; i<msg_ctx->num_waiters; i++) {
594                 if (msg_ctx->waiters[i] == req) {
595                         msg_ctx->waiters[i] = NULL;
596                         return;
597                 }
598         }
599
600         for (i=0; i<msg_ctx->num_new_waiters; i++) {
601                 if (msg_ctx->new_waiters[i] == req) {
602                         msg_ctx->new_waiters[i] = NULL;
603                         return;
604                 }
605         }
606 }
607
608 static void messaging_filtered_read_done(struct tevent_req *req,
609                                          struct messaging_rec *rec)
610 {
611         struct messaging_filtered_read_state *state = tevent_req_data(
612                 req, struct messaging_filtered_read_state);
613
614         state->rec = messaging_rec_dup(state, rec);
615         if (tevent_req_nomem(state->rec, req)) {
616                 return;
617         }
618         tevent_req_done(req);
619 }
620
621 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
622                                  struct messaging_rec **presult)
623 {
624         struct messaging_filtered_read_state *state = tevent_req_data(
625                 req, struct messaging_filtered_read_state);
626         int err;
627
628         if (tevent_req_is_unix_error(req, &err)) {
629                 tevent_req_received(req);
630                 return err;
631         }
632         *presult = talloc_move(mem_ctx, &state->rec);
633         return 0;
634 }
635
636 struct messaging_read_state {
637         uint32_t msg_type;
638         struct messaging_rec *rec;
639 };
640
641 static bool messaging_read_filter(struct messaging_rec *rec,
642                                   void *private_data);
643 static void messaging_read_done(struct tevent_req *subreq);
644
645 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
646                                        struct tevent_context *ev,
647                                        struct messaging_context *msg,
648                                        uint32_t msg_type)
649 {
650         struct tevent_req *req, *subreq;
651         struct messaging_read_state *state;
652
653         req = tevent_req_create(mem_ctx, &state,
654                                 struct messaging_read_state);
655         if (req == NULL) {
656                 return NULL;
657         }
658         state->msg_type = msg_type;
659
660         subreq = messaging_filtered_read_send(state, ev, msg,
661                                               messaging_read_filter, state);
662         if (tevent_req_nomem(subreq, req)) {
663                 return tevent_req_post(req, ev);
664         }
665         tevent_req_set_callback(subreq, messaging_read_done, req);
666         return req;
667 }
668
669 static bool messaging_read_filter(struct messaging_rec *rec,
670                                   void *private_data)
671 {
672         struct messaging_read_state *state = talloc_get_type_abort(
673                 private_data, struct messaging_read_state);
674
675         return rec->msg_type == state->msg_type;
676 }
677
678 static void messaging_read_done(struct tevent_req *subreq)
679 {
680         struct tevent_req *req = tevent_req_callback_data(
681                 subreq, struct tevent_req);
682         struct messaging_read_state *state = tevent_req_data(
683                 req, struct messaging_read_state);
684         int ret;
685
686         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
687         TALLOC_FREE(subreq);
688         if (tevent_req_error(req, ret)) {
689                 return;
690         }
691         tevent_req_done(req);
692 }
693
694 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
695                         struct messaging_rec **presult)
696 {
697         struct messaging_read_state *state = tevent_req_data(
698                 req, struct messaging_read_state);
699         int err;
700
701         if (tevent_req_is_unix_error(req, &err)) {
702                 return err;
703         }
704         if (presult != NULL) {
705                 *presult = talloc_move(mem_ctx, &state->rec);
706         }
707         return 0;
708 }
709
710 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
711 {
712         if (msg_ctx->num_new_waiters == 0) {
713                 return true;
714         }
715
716         if (talloc_array_length(msg_ctx->waiters) <
717             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
718                 struct tevent_req **tmp;
719                 tmp = talloc_realloc(
720                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
721                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
722                 if (tmp == NULL) {
723                         DEBUG(1, ("%s: talloc failed\n", __func__));
724                         return false;
725                 }
726                 msg_ctx->waiters = tmp;
727         }
728
729         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
730                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
731
732         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
733         msg_ctx->num_new_waiters = 0;
734
735         return true;
736 }
737
738 struct messaging_defer_callback_state {
739         struct messaging_context *msg_ctx;
740         struct messaging_rec *rec;
741         void (*fn)(struct messaging_context *msg, void *private_data,
742                    uint32_t msg_type, struct server_id server_id,
743                    DATA_BLOB *data);
744         void *private_data;
745 };
746
747 static void messaging_defer_callback_trigger(struct tevent_context *ev,
748                                              struct tevent_immediate *im,
749                                              void *private_data);
750
751 static void messaging_defer_callback(
752         struct messaging_context *msg_ctx, struct messaging_rec *rec,
753         void (*fn)(struct messaging_context *msg, void *private_data,
754                    uint32_t msg_type, struct server_id server_id,
755                    DATA_BLOB *data),
756         void *private_data)
757 {
758         struct messaging_defer_callback_state *state;
759         struct tevent_immediate *im;
760
761         state = talloc(msg_ctx, struct messaging_defer_callback_state);
762         if (state == NULL) {
763                 DEBUG(1, ("talloc failed\n"));
764                 return;
765         }
766         state->msg_ctx = msg_ctx;
767         state->fn = fn;
768         state->private_data = private_data;
769
770         state->rec = messaging_rec_dup(state, rec);
771         if (state->rec == NULL) {
772                 DEBUG(1, ("talloc failed\n"));
773                 TALLOC_FREE(state);
774                 return;
775         }
776
777         im = tevent_create_immediate(state);
778         if (im == NULL) {
779                 DEBUG(1, ("tevent_create_immediate failed\n"));
780                 TALLOC_FREE(state);
781                 return;
782         }
783         tevent_schedule_immediate(im, msg_ctx->event_ctx,
784                                   messaging_defer_callback_trigger, state);
785 }
786
787 static void messaging_defer_callback_trigger(struct tevent_context *ev,
788                                              struct tevent_immediate *im,
789                                              void *private_data)
790 {
791         struct messaging_defer_callback_state *state = talloc_get_type_abort(
792                 private_data, struct messaging_defer_callback_state);
793         struct messaging_rec *rec = state->rec;
794
795         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
796                   &rec->buf);
797         TALLOC_FREE(state);
798 }
799
800 /*
801   Dispatch one messaging_rec
802 */
803 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
804                             struct messaging_rec *rec)
805 {
806         struct messaging_callback *cb, *next;
807         unsigned i;
808
809         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
810                 next = cb->next;
811                 if (cb->msg_type != rec->msg_type) {
812                         continue;
813                 }
814
815                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
816                         /*
817                          * This is a self-send. We are called here from
818                          * messaging_send(), and we don't want to directly
819                          * recurse into the callback but go via a
820                          * tevent_loop_once
821                          */
822                         messaging_defer_callback(msg_ctx, rec, cb->fn,
823                                                  cb->private_data);
824                 } else {
825                         /*
826                          * This comes from a different process. we are called
827                          * from the event loop, so we should call back
828                          * directly.
829                          */
830                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
831                                rec->src, &rec->buf);
832                 }
833                 /*
834                  * we continue looking for matching messages after finding
835                  * one. This matters for subsystems like the internal notify
836                  * code which register more than one handler for the same
837                  * message type
838                  */
839         }
840
841         if (!messaging_append_new_waiters(msg_ctx)) {
842                 return;
843         }
844
845         i = 0;
846         while (i < msg_ctx->num_waiters) {
847                 struct tevent_req *req;
848                 struct messaging_filtered_read_state *state;
849
850                 req = msg_ctx->waiters[i];
851                 if (req == NULL) {
852                         /*
853                          * This got cleaned up. In the meantime,
854                          * move everything down one. We need
855                          * to keep the order of waiters, as
856                          * other code may depend on this.
857                          */
858                         if (i < msg_ctx->num_waiters - 1) {
859                                 memmove(&msg_ctx->waiters[i],
860                                         &msg_ctx->waiters[i+1],
861                                         sizeof(struct tevent_req *) *
862                                             (msg_ctx->num_waiters - i - 1));
863                         }
864                         msg_ctx->num_waiters -= 1;
865                         continue;
866                 }
867
868                 state = tevent_req_data(
869                         req, struct messaging_filtered_read_state);
870                 if (state->filter(rec, state->private_data)) {
871                         messaging_filtered_read_done(req, rec);
872                 }
873
874                 i += 1;
875         }
876 }
877
878 static int mess_parent_dgm_cleanup(void *private_data);
879 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
880
881 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
882 {
883         struct tevent_req *req;
884
885         req = background_job_send(
886                 msg, msg->event_ctx, msg, NULL, 0,
887                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
888                             60*15),
889                 mess_parent_dgm_cleanup, msg);
890         if (req == NULL) {
891                 return false;
892         }
893         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
894         return true;
895 }
896
897 static int mess_parent_dgm_cleanup(void *private_data)
898 {
899         struct messaging_context *msg_ctx = talloc_get_type_abort(
900                 private_data, struct messaging_context);
901         int ret;
902
903         ret = messaging_dgm_wipe(msg_ctx->local);
904         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
905                    ret ? strerror(ret) : "ok"));
906         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
907                            60*15);
908 }
909
910 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
911 {
912         struct messaging_context *msg = tevent_req_callback_data(
913                 req, struct messaging_context);
914         NTSTATUS status;
915
916         status = background_job_recv(req);
917         TALLOC_FREE(req);
918         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
919                   nt_errstr(status)));
920
921         req = background_job_send(
922                 msg, msg->event_ctx, msg, NULL, 0,
923                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
924                             60*15),
925                 mess_parent_dgm_cleanup, msg);
926         if (req == NULL) {
927                 DEBUG(1, ("background_job_send failed\n"));
928         }
929         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
930 }
931
932 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
933 {
934         int ret;
935
936         if (pid == 0) {
937                 ret = messaging_dgm_wipe(msg_ctx->local);
938         } else {
939                 ret = messaging_dgm_cleanup(msg_ctx->local, pid);
940         }
941
942         return ret;
943 }
944
945 struct tevent_context *messaging_tevent_context(
946         struct messaging_context *msg_ctx)
947 {
948         return msg_ctx->event_ctx;
949 }
950
951 /** @} **/