messaging3: Fix whitespace
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 /****************************************************************************
65  A useful function for testing the message system.
66 ****************************************************************************/
67
68 static void ping_message(struct messaging_context *msg_ctx,
69                          void *private_data,
70                          uint32_t msg_type,
71                          struct server_id src,
72                          DATA_BLOB *data)
73 {
74         const char *msg = "none";
75         char *free_me = NULL;
76
77         if (data->data != NULL) {
78                 free_me = talloc_strndup(talloc_tos(), (char *)data->data,
79                                          data->length);
80                 msg = free_me;
81         }
82         DEBUG(1,("INFO: Received PING message from PID %s [%s]\n",
83                  procid_str_static(&src), msg));
84         TALLOC_FREE(free_me);
85         messaging_send(msg_ctx, src, MSG_PONG, data);
86 }
87
88 /****************************************************************************
89  Register/replace a dispatch function for a particular message type.
90  JRA changed Dec 13 2006. Only one message handler now permitted per type.
91  *NOTE*: Dispatch functions must be able to cope with incoming
92  messages on an *odd* byte boundary.
93 ****************************************************************************/
94
95 struct msg_all {
96         struct messaging_context *msg_ctx;
97         int msg_type;
98         uint32 msg_flag;
99         const void *buf;
100         size_t len;
101         int n_sent;
102 };
103
104 /****************************************************************************
105  Send one of the messages for the broadcast.
106 ****************************************************************************/
107
108 static int traverse_fn(struct db_record *rec, const struct server_id *id,
109                        uint32_t msg_flags, void *state)
110 {
111         struct msg_all *msg_all = (struct msg_all *)state;
112         NTSTATUS status;
113
114         /* Don't send if the receiver hasn't registered an interest. */
115
116         if((msg_flags & msg_all->msg_flag) == 0) {
117                 return 0;
118         }
119
120         /* If the msg send fails because the pid was not found (i.e. smbd died), 
121          * the msg has already been deleted from the messages.tdb.*/
122
123         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
124                                     (const uint8_t *)msg_all->buf, msg_all->len);
125
126         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
127
128                 /*
129                  * If the pid was not found delete the entry from
130                  * serverid.tdb
131                  */
132
133                 DEBUG(2, ("pid %s doesn't exist\n", procid_str_static(id)));
134
135                 dbwrap_record_delete(rec);
136         }
137         msg_all->n_sent++;
138         return 0;
139 }
140
141 /**
142  * Send a message to all smbd processes.
143  *
144  * It isn't very efficient, but should be OK for the sorts of
145  * applications that use it. When we need efficient broadcast we can add
146  * it.
147  *
148  * @param n_sent Set to the number of messages sent.  This should be
149  * equal to the number of processes, but be careful for races.
150  *
151  * @retval True for success.
152  **/
153 bool message_send_all(struct messaging_context *msg_ctx,
154                       int msg_type,
155                       const void *buf, size_t len,
156                       int *n_sent)
157 {
158         struct msg_all msg_all;
159
160         msg_all.msg_type = msg_type;
161         if (msg_type < 0x100) {
162                 msg_all.msg_flag = FLAG_MSG_GENERAL;
163         } else if (msg_type > 0x100 && msg_type < 0x200) {
164                 msg_all.msg_flag = FLAG_MSG_NMBD;
165         } else if (msg_type > 0x200 && msg_type < 0x300) {
166                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
167         } else if (msg_type > 0x300 && msg_type < 0x400) {
168                 msg_all.msg_flag = FLAG_MSG_SMBD;
169         } else if (msg_type > 0x400 && msg_type < 0x600) {
170                 msg_all.msg_flag = FLAG_MSG_WINBIND;
171         } else if (msg_type > 4000 && msg_type < 5000) {
172                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
173         } else {
174                 return false;
175         }
176
177         msg_all.buf = buf;
178         msg_all.len = len;
179         msg_all.n_sent = 0;
180         msg_all.msg_ctx = msg_ctx;
181
182         serverid_traverse(traverse_fn, &msg_all);
183         if (n_sent)
184                 *n_sent = msg_all.n_sent;
185         return true;
186 }
187
188 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
189                                          struct tevent_context *ev)
190 {
191         struct messaging_context *ctx;
192         NTSTATUS status;
193
194         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
195                 return NULL;
196         }
197
198         ctx->id = procid_self();
199         ctx->event_ctx = ev;
200
201         status = messaging_dgm_init(ctx, ctx, &ctx->local);
202
203         if (!NT_STATUS_IS_OK(status)) {
204                 DEBUG(2, ("messaging_dgm_init failed: %s\n",
205                           nt_errstr(status)));
206                 TALLOC_FREE(ctx);
207                 return NULL;
208         }
209
210         if (lp_clustering()) {
211                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
212
213                 if (!NT_STATUS_IS_OK(status)) {
214                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
215                                   nt_errstr(status)));
216                         TALLOC_FREE(ctx);
217                         return NULL;
218                 }
219         }
220         ctx->id.vnn = get_my_vnn();
221
222         messaging_register(ctx, NULL, MSG_PING, ping_message);
223
224         /* Register some debugging related messages */
225
226         register_msg_pool_usage(ctx);
227         register_dmalloc_msgs(ctx);
228         debug_register_msgs(ctx);
229
230         return ctx;
231 }
232
233 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
234 {
235         return msg_ctx->id;
236 }
237
238 /*
239  * re-init after a fork
240  */
241 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
242 {
243         NTSTATUS status;
244
245         TALLOC_FREE(msg_ctx->local);
246
247         msg_ctx->id = procid_self();
248
249         status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
250         if (!NT_STATUS_IS_OK(status)) {
251                 DEBUG(0, ("messaging_dgm_init failed: %s\n",
252                           nt_errstr(status)));
253                 return status;
254         }
255
256         TALLOC_FREE(msg_ctx->remote);
257
258         if (lp_clustering()) {
259                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
260                                               &msg_ctx->remote);
261
262                 if (!NT_STATUS_IS_OK(status)) {
263                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
264                                   nt_errstr(status)));
265                         return status;
266                 }
267         }
268
269         return NT_STATUS_OK;
270 }
271
272
273 /*
274  * Register a dispatch function for a particular message type. Allow multiple
275  * registrants
276 */
277 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
278                             void *private_data,
279                             uint32_t msg_type,
280                             void (*fn)(struct messaging_context *msg,
281                                        void *private_data, 
282                                        uint32_t msg_type, 
283                                        struct server_id server_id,
284                                        DATA_BLOB *data))
285 {
286         struct messaging_callback *cb;
287
288         DEBUG(5, ("Registering messaging pointer for type %u - "
289                   "private_data=%p\n",
290                   (unsigned)msg_type, private_data));
291
292         /*
293          * Only one callback per type
294          */
295
296         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
297                 /* we allow a second registration of the same message
298                    type if it has a different private pointer. This is
299                    needed in, for example, the internal notify code,
300                    which creates a new notify context for each tree
301                    connect, and expects to receive messages to each of
302                    them. */
303                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
304                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
305                                   (unsigned)msg_type, private_data));
306                         cb->fn = fn;
307                         cb->private_data = private_data;
308                         return NT_STATUS_OK;
309                 }
310         }
311
312         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
313                 return NT_STATUS_NO_MEMORY;
314         }
315
316         cb->msg_type = msg_type;
317         cb->fn = fn;
318         cb->private_data = private_data;
319
320         DLIST_ADD(msg_ctx->callbacks, cb);
321         return NT_STATUS_OK;
322 }
323
324 /*
325   De-register the function for a particular message type.
326 */
327 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
328                           void *private_data)
329 {
330         struct messaging_callback *cb, *next;
331
332         for (cb = ctx->callbacks; cb; cb = next) {
333                 next = cb->next;
334                 if ((cb->msg_type == msg_type)
335                     && (cb->private_data == private_data)) {
336                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
337                                   (unsigned)msg_type, private_data));
338                         DLIST_REMOVE(ctx->callbacks, cb);
339                         TALLOC_FREE(cb);
340                 }
341         }
342 }
343
344 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
345                                    const struct server_id *dst)
346 {
347         return ((msg_ctx->id.vnn == dst->vnn) &&
348                 (msg_ctx->id.pid == dst->pid));
349 }
350
351 /*
352   Send a message to a particular server
353 */
354 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
355                         struct server_id server, uint32_t msg_type,
356                         const DATA_BLOB *data)
357 {
358         if (server_id_is_disconnected(&server)) {
359                 return NT_STATUS_INVALID_PARAMETER_MIX;
360         }
361
362         if (!procid_is_local(&server)) {
363                 return msg_ctx->remote->send_fn(msg_ctx, server,
364                                                 msg_type, data,
365                                                 msg_ctx->remote);
366         }
367
368         if (messaging_is_self_send(msg_ctx, &server)) {
369                 struct messaging_rec rec;
370                 rec.msg_version = MESSAGE_VERSION;
371                 rec.msg_type = msg_type & MSG_TYPE_MASK;
372                 rec.dest = server;
373                 rec.src = msg_ctx->id;
374                 rec.buf = *data;
375                 messaging_dispatch_rec(msg_ctx, &rec);
376                 return NT_STATUS_OK;
377         }
378
379         return msg_ctx->local->send_fn(msg_ctx, server, msg_type, data,
380                                        msg_ctx->local);
381 }
382
383 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
384                             struct server_id server, uint32_t msg_type,
385                             const uint8_t *buf, size_t len)
386 {
387         DATA_BLOB blob = data_blob_const(buf, len);
388         return messaging_send(msg_ctx, server, msg_type, &blob);
389 }
390
391 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
392                             struct server_id server, uint32_t msg_type,
393                             const struct iovec *iov, int iovlen)
394 {
395         uint8_t *buf;
396         NTSTATUS status;
397
398         buf = iov_buf(talloc_tos(), iov, iovlen);
399         if (buf == NULL) {
400                 return NT_STATUS_NO_MEMORY;
401         }
402
403         status = messaging_send_buf(msg_ctx, server, msg_type,
404                                     buf, talloc_get_size(buf));
405
406         TALLOC_FREE(buf);
407         return status;
408 }
409
410 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
411                                                struct messaging_rec *rec)
412 {
413         struct messaging_rec *result;
414
415         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
416                                       1, rec->buf.length);
417         if (result == NULL) {
418                 return NULL;
419         }
420         *result = *rec;
421
422         /* Doesn't fail, see talloc_pooled_object */
423
424         result->buf.data = talloc_memdup(result, rec->buf.data,
425                                          rec->buf.length);
426         return result;
427 }
428
429 struct messaging_filtered_read_state {
430         struct tevent_context *ev;
431         struct messaging_context *msg_ctx;
432         void *tevent_handle;
433
434         bool (*filter)(struct messaging_rec *rec, void *private_data);
435         void *private_data;
436
437         struct messaging_rec *rec;
438 };
439
440 static void messaging_filtered_read_cleanup(struct tevent_req *req,
441                                             enum tevent_req_state req_state);
442
443 struct tevent_req *messaging_filtered_read_send(
444         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
445         struct messaging_context *msg_ctx,
446         bool (*filter)(struct messaging_rec *rec, void *private_data),
447         void *private_data)
448 {
449         struct tevent_req *req;
450         struct messaging_filtered_read_state *state;
451         size_t new_waiters_len;
452
453         req = tevent_req_create(mem_ctx, &state,
454                                 struct messaging_filtered_read_state);
455         if (req == NULL) {
456                 return NULL;
457         }
458         state->ev = ev;
459         state->msg_ctx = msg_ctx;
460         state->filter = filter;
461         state->private_data = private_data;
462
463         /*
464          * We have to defer the callback here, as we might be called from
465          * within a different tevent_context than state->ev
466          */
467         tevent_req_defer_callback(req, state->ev);
468
469         state->tevent_handle = messaging_dgm_register_tevent_context(
470                 state, msg_ctx, ev);
471         if (tevent_req_nomem(state, req)) {
472                 return tevent_req_post(req, ev);
473         }
474
475         /*
476          * We add ourselves to the "new_waiters" array, not the "waiters"
477          * array. If we are called from within messaging_read_done,
478          * messaging_dispatch_rec will be in an active for-loop on
479          * "waiters". We must be careful not to mess with this array, because
480          * it could mean that a single event is being delivered twice.
481          */
482
483         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
484
485         if (new_waiters_len == msg_ctx->num_new_waiters) {
486                 struct tevent_req **tmp;
487
488                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
489                                      struct tevent_req *, new_waiters_len+1);
490                 if (tevent_req_nomem(tmp, req)) {
491                         return tevent_req_post(req, ev);
492                 }
493                 msg_ctx->new_waiters = tmp;
494         }
495
496         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
497         msg_ctx->num_new_waiters += 1;
498         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
499
500         return req;
501 }
502
503 static void messaging_filtered_read_cleanup(struct tevent_req *req,
504                                             enum tevent_req_state req_state)
505 {
506         struct messaging_filtered_read_state *state = tevent_req_data(
507                 req, struct messaging_filtered_read_state);
508         struct messaging_context *msg_ctx = state->msg_ctx;
509         unsigned i;
510
511         tevent_req_set_cleanup_fn(req, NULL);
512
513         TALLOC_FREE(state->tevent_handle);
514
515         /*
516          * Just set the [new_]waiters entry to NULL, be careful not to mess
517          * with the other "waiters" array contents. We are often called from
518          * within "messaging_dispatch_rec", which loops over
519          * "waiters". Messing with the "waiters" array will mess up that
520          * for-loop.
521          */
522
523         for (i=0; i<msg_ctx->num_waiters; i++) {
524                 if (msg_ctx->waiters[i] == req) {
525                         msg_ctx->waiters[i] = NULL;
526                         return;
527                 }
528         }
529
530         for (i=0; i<msg_ctx->num_new_waiters; i++) {
531                 if (msg_ctx->new_waiters[i] == req) {
532                         msg_ctx->new_waiters[i] = NULL;
533                         return;
534                 }
535         }
536 }
537
538 static void messaging_filtered_read_done(struct tevent_req *req,
539                                          struct messaging_rec *rec)
540 {
541         struct messaging_filtered_read_state *state = tevent_req_data(
542                 req, struct messaging_filtered_read_state);
543
544         state->rec = messaging_rec_dup(state, rec);
545         if (tevent_req_nomem(state->rec, req)) {
546                 return;
547         }
548         tevent_req_done(req);
549 }
550
551 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
552                                  struct messaging_rec **presult)
553 {
554         struct messaging_filtered_read_state *state = tevent_req_data(
555                 req, struct messaging_filtered_read_state);
556         int err;
557
558         if (tevent_req_is_unix_error(req, &err)) {
559                 tevent_req_received(req);
560                 return err;
561         }
562         *presult = talloc_move(mem_ctx, &state->rec);
563         return 0;
564 }
565
566 struct messaging_read_state {
567         uint32_t msg_type;
568         struct messaging_rec *rec;
569 };
570
571 static bool messaging_read_filter(struct messaging_rec *rec,
572                                   void *private_data);
573 static void messaging_read_done(struct tevent_req *subreq);
574
575 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
576                                        struct tevent_context *ev,
577                                        struct messaging_context *msg,
578                                        uint32_t msg_type)
579 {
580         struct tevent_req *req, *subreq;
581         struct messaging_read_state *state;
582
583         req = tevent_req_create(mem_ctx, &state,
584                                 struct messaging_read_state);
585         if (req == NULL) {
586                 return NULL;
587         }
588         state->msg_type = msg_type;
589
590         subreq = messaging_filtered_read_send(state, ev, msg,
591                                               messaging_read_filter, state);
592         if (tevent_req_nomem(subreq, req)) {
593                 return tevent_req_post(req, ev);
594         }
595         tevent_req_set_callback(subreq, messaging_read_done, req);
596         return req;
597 }
598
599 static bool messaging_read_filter(struct messaging_rec *rec,
600                                   void *private_data)
601 {
602         struct messaging_read_state *state = talloc_get_type_abort(
603                 private_data, struct messaging_read_state);
604
605         return rec->msg_type == state->msg_type;
606 }
607
608 static void messaging_read_done(struct tevent_req *subreq)
609 {
610         struct tevent_req *req = tevent_req_callback_data(
611                 subreq, struct tevent_req);
612         struct messaging_read_state *state = tevent_req_data(
613                 req, struct messaging_read_state);
614         int ret;
615
616         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
617         TALLOC_FREE(subreq);
618         if (tevent_req_error(req, ret)) {
619                 return;
620         }
621         tevent_req_done(req);
622 }
623
624 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
625                         struct messaging_rec **presult)
626 {
627         struct messaging_read_state *state = tevent_req_data(
628                 req, struct messaging_read_state);
629         int err;
630
631         if (tevent_req_is_unix_error(req, &err)) {
632                 return err;
633         }
634         if (presult != NULL) {
635                 *presult = talloc_move(mem_ctx, &state->rec);
636         }
637         return 0;
638 }
639
640 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
641 {
642         if (msg_ctx->num_new_waiters == 0) {
643                 return true;
644         }
645
646         if (talloc_array_length(msg_ctx->waiters) <
647             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
648                 struct tevent_req **tmp;
649                 tmp = talloc_realloc(
650                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
651                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
652                 if (tmp == NULL) {
653                         DEBUG(1, ("%s: talloc failed\n", __func__));
654                         return false;
655                 }
656                 msg_ctx->waiters = tmp;
657         }
658
659         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
660                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
661
662         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
663         msg_ctx->num_new_waiters = 0;
664
665         return true;
666 }
667
668 struct messaging_defer_callback_state {
669         struct messaging_context *msg_ctx;
670         struct messaging_rec *rec;
671         void (*fn)(struct messaging_context *msg, void *private_data,
672                    uint32_t msg_type, struct server_id server_id,
673                    DATA_BLOB *data);
674         void *private_data;
675 };
676
677 static void messaging_defer_callback_trigger(struct tevent_context *ev,
678                                              struct tevent_immediate *im,
679                                              void *private_data);
680
681 static void messaging_defer_callback(
682         struct messaging_context *msg_ctx, struct messaging_rec *rec,
683         void (*fn)(struct messaging_context *msg, void *private_data,
684                    uint32_t msg_type, struct server_id server_id,
685                    DATA_BLOB *data),
686         void *private_data)
687 {
688         struct messaging_defer_callback_state *state;
689         struct tevent_immediate *im;
690
691         state = talloc(msg_ctx, struct messaging_defer_callback_state);
692         if (state == NULL) {
693                 DEBUG(1, ("talloc failed\n"));
694                 return;
695         }
696         state->msg_ctx = msg_ctx;
697         state->fn = fn;
698         state->private_data = private_data;
699
700         state->rec = messaging_rec_dup(state, rec);
701         if (state->rec == NULL) {
702                 DEBUG(1, ("talloc failed\n"));
703                 TALLOC_FREE(state);
704                 return;
705         }
706
707         im = tevent_create_immediate(state);
708         if (im == NULL) {
709                 DEBUG(1, ("tevent_create_immediate failed\n"));
710                 TALLOC_FREE(state);
711                 return;
712         }
713         tevent_schedule_immediate(im, msg_ctx->event_ctx,
714                                   messaging_defer_callback_trigger, state);
715 }
716
717 static void messaging_defer_callback_trigger(struct tevent_context *ev,
718                                              struct tevent_immediate *im,
719                                              void *private_data)
720 {
721         struct messaging_defer_callback_state *state = talloc_get_type_abort(
722                 private_data, struct messaging_defer_callback_state);
723         struct messaging_rec *rec = state->rec;
724
725         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
726                   &rec->buf);
727         TALLOC_FREE(state);
728 }
729
730 /*
731   Dispatch one messaging_rec
732 */
733 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
734                             struct messaging_rec *rec)
735 {
736         struct messaging_callback *cb, *next;
737         unsigned i;
738
739         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
740                 next = cb->next;
741                 if (cb->msg_type != rec->msg_type) {
742                         continue;
743                 }
744
745                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
746                         /*
747                          * This is a self-send. We are called here from
748                          * messaging_send(), and we don't want to directly
749                          * recurse into the callback but go via a
750                          * tevent_loop_once
751                          */
752                         messaging_defer_callback(msg_ctx, rec, cb->fn,
753                                                  cb->private_data);
754                 } else {
755                         /*
756                          * This comes from a different process. we are called
757                          * from the event loop, so we should call back
758                          * directly.
759                          */
760                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
761                                rec->src, &rec->buf);
762                 }
763                 /*
764                  * we continue looking for matching messages after finding
765                  * one. This matters for subsystems like the internal notify
766                  * code which register more than one handler for the same
767                  * message type
768                  */
769         }
770
771         if (!messaging_append_new_waiters(msg_ctx)) {
772                 return;
773         }
774
775         i = 0;
776         while (i < msg_ctx->num_waiters) {
777                 struct tevent_req *req;
778                 struct messaging_filtered_read_state *state;
779
780                 req = msg_ctx->waiters[i];
781                 if (req == NULL) {
782                         /*
783                          * This got cleaned up. In the meantime,
784                          * move everything down one. We need
785                          * to keep the order of waiters, as
786                          * other code may depend on this.
787                          */
788                         if (i < msg_ctx->num_waiters - 1) {
789                                 memmove(&msg_ctx->waiters[i],
790                                         &msg_ctx->waiters[i+1],
791                                         sizeof(struct tevent_req *) *
792                                             (msg_ctx->num_waiters - i - 1));
793                         }
794                         msg_ctx->num_waiters -= 1;
795                         continue;
796                 }
797
798                 state = tevent_req_data(
799                         req, struct messaging_filtered_read_state);
800                 if (state->filter(rec, state->private_data)) {
801                         messaging_filtered_read_done(req, rec);
802                 }
803
804                 i += 1;
805         }
806         return;
807 }
808
809 static int mess_parent_dgm_cleanup(void *private_data);
810 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
811
812 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
813 {
814         struct tevent_req *req;
815
816         req = background_job_send(
817                 msg, msg->event_ctx, msg, NULL, 0,
818                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
819                             60*15),
820                 mess_parent_dgm_cleanup, msg);
821         if (req == NULL) {
822                 return false;
823         }
824         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
825         return true;
826 }
827
828 static int mess_parent_dgm_cleanup(void *private_data)
829 {
830         struct messaging_context *msg_ctx = talloc_get_type_abort(
831                 private_data, struct messaging_context);
832         NTSTATUS status;
833
834         status = messaging_dgm_wipe(msg_ctx);
835         DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
836         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
837                            60*15);
838 }
839
840 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
841 {
842         struct messaging_context *msg = tevent_req_callback_data(
843                 req, struct messaging_context);
844         NTSTATUS status;
845
846         status = background_job_recv(req);
847         TALLOC_FREE(req);
848         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
849                   nt_errstr(status)));
850
851         req = background_job_send(
852                 msg, msg->event_ctx, msg, NULL, 0,
853                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
854                             60*15),
855                 mess_parent_dgm_cleanup, msg);
856         if (req == NULL) {
857                 DEBUG(1, ("background_job_send failed\n"));
858         }
859         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
860 }
861
862 /** @} **/