messaging3: Add messaging_cleanup
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 struct messaging_context {
65         struct server_id id;
66         struct tevent_context *event_ctx;
67         struct messaging_callback *callbacks;
68
69         struct tevent_req **new_waiters;
70         unsigned num_new_waiters;
71
72         struct tevent_req **waiters;
73         unsigned num_waiters;
74
75         struct messaging_backend *local;
76         struct messaging_backend *remote;
77
78         bool *have_context;
79 };
80
81 static int messaging_context_destructor(struct messaging_context *msg_ctx);
82
83 /****************************************************************************
84  A useful function for testing the message system.
85 ****************************************************************************/
86
87 static void ping_message(struct messaging_context *msg_ctx,
88                          void *private_data,
89                          uint32_t msg_type,
90                          struct server_id src,
91                          DATA_BLOB *data)
92 {
93         struct server_id_buf idbuf;
94
95         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
96                   server_id_str_buf(src, &idbuf), (int)data->length,
97                   data->data ? (char *)data->data : ""));
98
99         messaging_send(msg_ctx, src, MSG_PONG, data);
100 }
101
102 /****************************************************************************
103  Register/replace a dispatch function for a particular message type.
104  JRA changed Dec 13 2006. Only one message handler now permitted per type.
105  *NOTE*: Dispatch functions must be able to cope with incoming
106  messages on an *odd* byte boundary.
107 ****************************************************************************/
108
109 struct msg_all {
110         struct messaging_context *msg_ctx;
111         int msg_type;
112         uint32 msg_flag;
113         const void *buf;
114         size_t len;
115         int n_sent;
116 };
117
118 /****************************************************************************
119  Send one of the messages for the broadcast.
120 ****************************************************************************/
121
122 static int traverse_fn(struct db_record *rec, const struct server_id *id,
123                        uint32_t msg_flags, void *state)
124 {
125         struct msg_all *msg_all = (struct msg_all *)state;
126         NTSTATUS status;
127
128         /* Don't send if the receiver hasn't registered an interest. */
129
130         if((msg_flags & msg_all->msg_flag) == 0) {
131                 return 0;
132         }
133
134         /* If the msg send fails because the pid was not found (i.e. smbd died), 
135          * the msg has already been deleted from the messages.tdb.*/
136
137         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
138                                     (const uint8_t *)msg_all->buf, msg_all->len);
139
140         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
141                 struct server_id_buf idbuf;
142
143                 /*
144                  * If the pid was not found delete the entry from
145                  * serverid.tdb
146                  */
147
148                 DEBUG(2, ("pid %s doesn't exist\n",
149                           server_id_str_buf(*id, &idbuf)));
150
151                 dbwrap_record_delete(rec);
152         }
153         msg_all->n_sent++;
154         return 0;
155 }
156
157 /**
158  * Send a message to all smbd processes.
159  *
160  * It isn't very efficient, but should be OK for the sorts of
161  * applications that use it. When we need efficient broadcast we can add
162  * it.
163  *
164  * @param n_sent Set to the number of messages sent.  This should be
165  * equal to the number of processes, but be careful for races.
166  *
167  * @retval True for success.
168  **/
169 bool message_send_all(struct messaging_context *msg_ctx,
170                       int msg_type,
171                       const void *buf, size_t len,
172                       int *n_sent)
173 {
174         struct msg_all msg_all;
175
176         msg_all.msg_type = msg_type;
177         if (msg_type < 0x100) {
178                 msg_all.msg_flag = FLAG_MSG_GENERAL;
179         } else if (msg_type > 0x100 && msg_type < 0x200) {
180                 msg_all.msg_flag = FLAG_MSG_NMBD;
181         } else if (msg_type > 0x200 && msg_type < 0x300) {
182                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
183         } else if (msg_type > 0x300 && msg_type < 0x400) {
184                 msg_all.msg_flag = FLAG_MSG_SMBD;
185         } else if (msg_type > 0x400 && msg_type < 0x600) {
186                 msg_all.msg_flag = FLAG_MSG_WINBIND;
187         } else if (msg_type > 4000 && msg_type < 5000) {
188                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
189         } else {
190                 return false;
191         }
192
193         msg_all.buf = buf;
194         msg_all.len = len;
195         msg_all.n_sent = 0;
196         msg_all.msg_ctx = msg_ctx;
197
198         serverid_traverse(traverse_fn, &msg_all);
199         if (n_sent)
200                 *n_sent = msg_all.n_sent;
201         return true;
202 }
203
204 static void messaging_recv_cb(int msg_type,
205                               struct server_id src, struct server_id dst,
206                               const uint8_t *msg, size_t msg_len,
207                               void *private_data)
208 {
209         struct messaging_context *msg_ctx = talloc_get_type_abort(
210                 private_data, struct messaging_context);
211         struct messaging_rec rec;
212
213         rec = (struct messaging_rec) {
214                 .msg_version = MESSAGE_VERSION,
215                 .msg_type = msg_type,
216                 .src = src,
217                 .dest = dst,
218                 .buf.data = discard_const_p(uint8, msg),
219                 .buf.length = msg_len
220         };
221
222         messaging_dispatch_rec(msg_ctx, &rec);
223 }
224
225 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
226                                          struct tevent_context *ev)
227 {
228         struct messaging_context *ctx;
229         NTSTATUS status;
230         int ret;
231         static bool have_context = false;
232
233         if (have_context) {
234                 DEBUG(0, ("No two messaging contexts per process\n"));
235                 return NULL;
236         }
237
238
239         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
240                 return NULL;
241         }
242
243         ctx->id = procid_self();
244         ctx->event_ctx = ev;
245         ctx->have_context = &have_context;
246
247         ret = messaging_dgm_init(ctx, ctx->event_ctx, ctx->id,
248                                  &ctx->local, messaging_recv_cb, ctx);
249
250         if (ret != 0) {
251                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
252                 TALLOC_FREE(ctx);
253                 return NULL;
254         }
255
256         if (lp_clustering()) {
257                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
258
259                 if (!NT_STATUS_IS_OK(status)) {
260                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
261                                   nt_errstr(status)));
262                         TALLOC_FREE(ctx);
263                         return NULL;
264                 }
265         }
266         ctx->id.vnn = get_my_vnn();
267
268         messaging_register(ctx, NULL, MSG_PING, ping_message);
269
270         /* Register some debugging related messages */
271
272         register_msg_pool_usage(ctx);
273         register_dmalloc_msgs(ctx);
274         debug_register_msgs(ctx);
275
276         have_context = true;
277         talloc_set_destructor(ctx, messaging_context_destructor);
278
279         return ctx;
280 }
281
282 static int messaging_context_destructor(struct messaging_context *msg_ctx)
283 {
284         SMB_ASSERT(*msg_ctx->have_context);
285         *msg_ctx->have_context = false;
286         return 0;
287 }
288
289 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
290 {
291         return msg_ctx->id;
292 }
293
294 /*
295  * re-init after a fork
296  */
297 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
298 {
299         NTSTATUS status;
300         int ret;
301
302         TALLOC_FREE(msg_ctx->local);
303
304         msg_ctx->id = procid_self();
305
306         ret = messaging_dgm_init(msg_ctx, msg_ctx->event_ctx,
307                                  msg_ctx->id, &msg_ctx->local,
308                                  messaging_recv_cb, msg_ctx);
309         if (ret != 0) {
310                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
311                 return map_nt_error_from_unix(ret);
312         }
313
314         TALLOC_FREE(msg_ctx->remote);
315
316         if (lp_clustering()) {
317                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
318                                               &msg_ctx->remote);
319
320                 if (!NT_STATUS_IS_OK(status)) {
321                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
322                                   nt_errstr(status)));
323                         return status;
324                 }
325         }
326
327         return NT_STATUS_OK;
328 }
329
330
331 /*
332  * Register a dispatch function for a particular message type. Allow multiple
333  * registrants
334 */
335 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
336                             void *private_data,
337                             uint32_t msg_type,
338                             void (*fn)(struct messaging_context *msg,
339                                        void *private_data, 
340                                        uint32_t msg_type, 
341                                        struct server_id server_id,
342                                        DATA_BLOB *data))
343 {
344         struct messaging_callback *cb;
345
346         DEBUG(5, ("Registering messaging pointer for type %u - "
347                   "private_data=%p\n",
348                   (unsigned)msg_type, private_data));
349
350         /*
351          * Only one callback per type
352          */
353
354         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
355                 /* we allow a second registration of the same message
356                    type if it has a different private pointer. This is
357                    needed in, for example, the internal notify code,
358                    which creates a new notify context for each tree
359                    connect, and expects to receive messages to each of
360                    them. */
361                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
362                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
363                                   (unsigned)msg_type, private_data));
364                         cb->fn = fn;
365                         cb->private_data = private_data;
366                         return NT_STATUS_OK;
367                 }
368         }
369
370         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
371                 return NT_STATUS_NO_MEMORY;
372         }
373
374         cb->msg_type = msg_type;
375         cb->fn = fn;
376         cb->private_data = private_data;
377
378         DLIST_ADD(msg_ctx->callbacks, cb);
379         return NT_STATUS_OK;
380 }
381
382 /*
383   De-register the function for a particular message type.
384 */
385 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
386                           void *private_data)
387 {
388         struct messaging_callback *cb, *next;
389
390         for (cb = ctx->callbacks; cb; cb = next) {
391                 next = cb->next;
392                 if ((cb->msg_type == msg_type)
393                     && (cb->private_data == private_data)) {
394                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
395                                   (unsigned)msg_type, private_data));
396                         DLIST_REMOVE(ctx->callbacks, cb);
397                         TALLOC_FREE(cb);
398                 }
399         }
400 }
401
402 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
403                                    const struct server_id *dst)
404 {
405         return ((msg_ctx->id.vnn == dst->vnn) &&
406                 (msg_ctx->id.pid == dst->pid));
407 }
408
409 /*
410   Send a message to a particular server
411 */
412 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
413                         struct server_id server, uint32_t msg_type,
414                         const DATA_BLOB *data)
415 {
416         struct iovec iov;
417
418         iov.iov_base = data->data;
419         iov.iov_len = data->length;
420
421         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
422 }
423
424 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
425                             struct server_id server, uint32_t msg_type,
426                             const uint8_t *buf, size_t len)
427 {
428         DATA_BLOB blob = data_blob_const(buf, len);
429         return messaging_send(msg_ctx, server, msg_type, &blob);
430 }
431
432 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
433                             struct server_id server, uint32_t msg_type,
434                             const struct iovec *iov, int iovlen)
435 {
436         int ret;
437
438         if (server_id_is_disconnected(&server)) {
439                 return NT_STATUS_INVALID_PARAMETER_MIX;
440         }
441
442         if (!procid_is_local(&server)) {
443                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
444                                                msg_type, iov, iovlen,
445                                                msg_ctx->remote);
446                 if (ret != 0) {
447                         return map_nt_error_from_unix(ret);
448                 }
449                 return NT_STATUS_OK;
450         }
451
452         if (messaging_is_self_send(msg_ctx, &server)) {
453                 struct messaging_rec rec;
454                 uint8_t *buf;
455
456                 buf = iov_buf(talloc_tos(), iov, iovlen);
457                 if (buf == NULL) {
458                         return NT_STATUS_NO_MEMORY;
459                 }
460
461                 rec.msg_version = MESSAGE_VERSION;
462                 rec.msg_type = msg_type & MSG_TYPE_MASK;
463                 rec.dest = server;
464                 rec.src = msg_ctx->id;
465                 rec.buf = data_blob_const(buf, talloc_get_size(buf));
466                 messaging_dispatch_rec(msg_ctx, &rec);
467                 TALLOC_FREE(buf);
468                 return NT_STATUS_OK;
469         }
470
471         ret = msg_ctx->local->send_fn(msg_ctx->id, server, msg_type,
472                                       iov, iovlen, msg_ctx->local);
473         if (ret != 0) {
474                 return map_nt_error_from_unix(ret);
475         }
476         return NT_STATUS_OK;
477 }
478
479 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
480                                                struct messaging_rec *rec)
481 {
482         struct messaging_rec *result;
483
484         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
485                                       1, rec->buf.length);
486         if (result == NULL) {
487                 return NULL;
488         }
489         *result = *rec;
490
491         /* Doesn't fail, see talloc_pooled_object */
492
493         result->buf.data = talloc_memdup(result, rec->buf.data,
494                                          rec->buf.length);
495         return result;
496 }
497
498 struct messaging_filtered_read_state {
499         struct tevent_context *ev;
500         struct messaging_context *msg_ctx;
501         void *tevent_handle;
502
503         bool (*filter)(struct messaging_rec *rec, void *private_data);
504         void *private_data;
505
506         struct messaging_rec *rec;
507 };
508
509 static void messaging_filtered_read_cleanup(struct tevent_req *req,
510                                             enum tevent_req_state req_state);
511
512 struct tevent_req *messaging_filtered_read_send(
513         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
514         struct messaging_context *msg_ctx,
515         bool (*filter)(struct messaging_rec *rec, void *private_data),
516         void *private_data)
517 {
518         struct tevent_req *req;
519         struct messaging_filtered_read_state *state;
520         size_t new_waiters_len;
521
522         req = tevent_req_create(mem_ctx, &state,
523                                 struct messaging_filtered_read_state);
524         if (req == NULL) {
525                 return NULL;
526         }
527         state->ev = ev;
528         state->msg_ctx = msg_ctx;
529         state->filter = filter;
530         state->private_data = private_data;
531
532         /*
533          * We have to defer the callback here, as we might be called from
534          * within a different tevent_context than state->ev
535          */
536         tevent_req_defer_callback(req, state->ev);
537
538         state->tevent_handle = messaging_dgm_register_tevent_context(
539                 state, msg_ctx, ev);
540         if (tevent_req_nomem(state, req)) {
541                 return tevent_req_post(req, ev);
542         }
543
544         /*
545          * We add ourselves to the "new_waiters" array, not the "waiters"
546          * array. If we are called from within messaging_read_done,
547          * messaging_dispatch_rec will be in an active for-loop on
548          * "waiters". We must be careful not to mess with this array, because
549          * it could mean that a single event is being delivered twice.
550          */
551
552         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
553
554         if (new_waiters_len == msg_ctx->num_new_waiters) {
555                 struct tevent_req **tmp;
556
557                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
558                                      struct tevent_req *, new_waiters_len+1);
559                 if (tevent_req_nomem(tmp, req)) {
560                         return tevent_req_post(req, ev);
561                 }
562                 msg_ctx->new_waiters = tmp;
563         }
564
565         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
566         msg_ctx->num_new_waiters += 1;
567         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
568
569         return req;
570 }
571
572 static void messaging_filtered_read_cleanup(struct tevent_req *req,
573                                             enum tevent_req_state req_state)
574 {
575         struct messaging_filtered_read_state *state = tevent_req_data(
576                 req, struct messaging_filtered_read_state);
577         struct messaging_context *msg_ctx = state->msg_ctx;
578         unsigned i;
579
580         tevent_req_set_cleanup_fn(req, NULL);
581
582         TALLOC_FREE(state->tevent_handle);
583
584         /*
585          * Just set the [new_]waiters entry to NULL, be careful not to mess
586          * with the other "waiters" array contents. We are often called from
587          * within "messaging_dispatch_rec", which loops over
588          * "waiters". Messing with the "waiters" array will mess up that
589          * for-loop.
590          */
591
592         for (i=0; i<msg_ctx->num_waiters; i++) {
593                 if (msg_ctx->waiters[i] == req) {
594                         msg_ctx->waiters[i] = NULL;
595                         return;
596                 }
597         }
598
599         for (i=0; i<msg_ctx->num_new_waiters; i++) {
600                 if (msg_ctx->new_waiters[i] == req) {
601                         msg_ctx->new_waiters[i] = NULL;
602                         return;
603                 }
604         }
605 }
606
607 static void messaging_filtered_read_done(struct tevent_req *req,
608                                          struct messaging_rec *rec)
609 {
610         struct messaging_filtered_read_state *state = tevent_req_data(
611                 req, struct messaging_filtered_read_state);
612
613         state->rec = messaging_rec_dup(state, rec);
614         if (tevent_req_nomem(state->rec, req)) {
615                 return;
616         }
617         tevent_req_done(req);
618 }
619
620 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
621                                  struct messaging_rec **presult)
622 {
623         struct messaging_filtered_read_state *state = tevent_req_data(
624                 req, struct messaging_filtered_read_state);
625         int err;
626
627         if (tevent_req_is_unix_error(req, &err)) {
628                 tevent_req_received(req);
629                 return err;
630         }
631         *presult = talloc_move(mem_ctx, &state->rec);
632         return 0;
633 }
634
635 struct messaging_read_state {
636         uint32_t msg_type;
637         struct messaging_rec *rec;
638 };
639
640 static bool messaging_read_filter(struct messaging_rec *rec,
641                                   void *private_data);
642 static void messaging_read_done(struct tevent_req *subreq);
643
644 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
645                                        struct tevent_context *ev,
646                                        struct messaging_context *msg,
647                                        uint32_t msg_type)
648 {
649         struct tevent_req *req, *subreq;
650         struct messaging_read_state *state;
651
652         req = tevent_req_create(mem_ctx, &state,
653                                 struct messaging_read_state);
654         if (req == NULL) {
655                 return NULL;
656         }
657         state->msg_type = msg_type;
658
659         subreq = messaging_filtered_read_send(state, ev, msg,
660                                               messaging_read_filter, state);
661         if (tevent_req_nomem(subreq, req)) {
662                 return tevent_req_post(req, ev);
663         }
664         tevent_req_set_callback(subreq, messaging_read_done, req);
665         return req;
666 }
667
668 static bool messaging_read_filter(struct messaging_rec *rec,
669                                   void *private_data)
670 {
671         struct messaging_read_state *state = talloc_get_type_abort(
672                 private_data, struct messaging_read_state);
673
674         return rec->msg_type == state->msg_type;
675 }
676
677 static void messaging_read_done(struct tevent_req *subreq)
678 {
679         struct tevent_req *req = tevent_req_callback_data(
680                 subreq, struct tevent_req);
681         struct messaging_read_state *state = tevent_req_data(
682                 req, struct messaging_read_state);
683         int ret;
684
685         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
686         TALLOC_FREE(subreq);
687         if (tevent_req_error(req, ret)) {
688                 return;
689         }
690         tevent_req_done(req);
691 }
692
693 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
694                         struct messaging_rec **presult)
695 {
696         struct messaging_read_state *state = tevent_req_data(
697                 req, struct messaging_read_state);
698         int err;
699
700         if (tevent_req_is_unix_error(req, &err)) {
701                 return err;
702         }
703         if (presult != NULL) {
704                 *presult = talloc_move(mem_ctx, &state->rec);
705         }
706         return 0;
707 }
708
709 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
710 {
711         if (msg_ctx->num_new_waiters == 0) {
712                 return true;
713         }
714
715         if (talloc_array_length(msg_ctx->waiters) <
716             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
717                 struct tevent_req **tmp;
718                 tmp = talloc_realloc(
719                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
720                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
721                 if (tmp == NULL) {
722                         DEBUG(1, ("%s: talloc failed\n", __func__));
723                         return false;
724                 }
725                 msg_ctx->waiters = tmp;
726         }
727
728         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
729                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
730
731         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
732         msg_ctx->num_new_waiters = 0;
733
734         return true;
735 }
736
737 struct messaging_defer_callback_state {
738         struct messaging_context *msg_ctx;
739         struct messaging_rec *rec;
740         void (*fn)(struct messaging_context *msg, void *private_data,
741                    uint32_t msg_type, struct server_id server_id,
742                    DATA_BLOB *data);
743         void *private_data;
744 };
745
746 static void messaging_defer_callback_trigger(struct tevent_context *ev,
747                                              struct tevent_immediate *im,
748                                              void *private_data);
749
750 static void messaging_defer_callback(
751         struct messaging_context *msg_ctx, struct messaging_rec *rec,
752         void (*fn)(struct messaging_context *msg, void *private_data,
753                    uint32_t msg_type, struct server_id server_id,
754                    DATA_BLOB *data),
755         void *private_data)
756 {
757         struct messaging_defer_callback_state *state;
758         struct tevent_immediate *im;
759
760         state = talloc(msg_ctx, struct messaging_defer_callback_state);
761         if (state == NULL) {
762                 DEBUG(1, ("talloc failed\n"));
763                 return;
764         }
765         state->msg_ctx = msg_ctx;
766         state->fn = fn;
767         state->private_data = private_data;
768
769         state->rec = messaging_rec_dup(state, rec);
770         if (state->rec == NULL) {
771                 DEBUG(1, ("talloc failed\n"));
772                 TALLOC_FREE(state);
773                 return;
774         }
775
776         im = tevent_create_immediate(state);
777         if (im == NULL) {
778                 DEBUG(1, ("tevent_create_immediate failed\n"));
779                 TALLOC_FREE(state);
780                 return;
781         }
782         tevent_schedule_immediate(im, msg_ctx->event_ctx,
783                                   messaging_defer_callback_trigger, state);
784 }
785
786 static void messaging_defer_callback_trigger(struct tevent_context *ev,
787                                              struct tevent_immediate *im,
788                                              void *private_data)
789 {
790         struct messaging_defer_callback_state *state = talloc_get_type_abort(
791                 private_data, struct messaging_defer_callback_state);
792         struct messaging_rec *rec = state->rec;
793
794         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
795                   &rec->buf);
796         TALLOC_FREE(state);
797 }
798
799 /*
800   Dispatch one messaging_rec
801 */
802 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
803                             struct messaging_rec *rec)
804 {
805         struct messaging_callback *cb, *next;
806         unsigned i;
807
808         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
809                 next = cb->next;
810                 if (cb->msg_type != rec->msg_type) {
811                         continue;
812                 }
813
814                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
815                         /*
816                          * This is a self-send. We are called here from
817                          * messaging_send(), and we don't want to directly
818                          * recurse into the callback but go via a
819                          * tevent_loop_once
820                          */
821                         messaging_defer_callback(msg_ctx, rec, cb->fn,
822                                                  cb->private_data);
823                 } else {
824                         /*
825                          * This comes from a different process. we are called
826                          * from the event loop, so we should call back
827                          * directly.
828                          */
829                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
830                                rec->src, &rec->buf);
831                 }
832                 /*
833                  * we continue looking for matching messages after finding
834                  * one. This matters for subsystems like the internal notify
835                  * code which register more than one handler for the same
836                  * message type
837                  */
838         }
839
840         if (!messaging_append_new_waiters(msg_ctx)) {
841                 return;
842         }
843
844         i = 0;
845         while (i < msg_ctx->num_waiters) {
846                 struct tevent_req *req;
847                 struct messaging_filtered_read_state *state;
848
849                 req = msg_ctx->waiters[i];
850                 if (req == NULL) {
851                         /*
852                          * This got cleaned up. In the meantime,
853                          * move everything down one. We need
854                          * to keep the order of waiters, as
855                          * other code may depend on this.
856                          */
857                         if (i < msg_ctx->num_waiters - 1) {
858                                 memmove(&msg_ctx->waiters[i],
859                                         &msg_ctx->waiters[i+1],
860                                         sizeof(struct tevent_req *) *
861                                             (msg_ctx->num_waiters - i - 1));
862                         }
863                         msg_ctx->num_waiters -= 1;
864                         continue;
865                 }
866
867                 state = tevent_req_data(
868                         req, struct messaging_filtered_read_state);
869                 if (state->filter(rec, state->private_data)) {
870                         messaging_filtered_read_done(req, rec);
871                 }
872
873                 i += 1;
874         }
875 }
876
877 static int mess_parent_dgm_cleanup(void *private_data);
878 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
879
880 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
881 {
882         struct tevent_req *req;
883
884         req = background_job_send(
885                 msg, msg->event_ctx, msg, NULL, 0,
886                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
887                             60*15),
888                 mess_parent_dgm_cleanup, msg);
889         if (req == NULL) {
890                 return false;
891         }
892         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
893         return true;
894 }
895
896 static int mess_parent_dgm_cleanup(void *private_data)
897 {
898         struct messaging_context *msg_ctx = talloc_get_type_abort(
899                 private_data, struct messaging_context);
900         int ret;
901
902         ret = messaging_dgm_wipe(msg_ctx);
903         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
904                    ret ? strerror(ret) : "ok"));
905         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
906                            60*15);
907 }
908
909 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
910 {
911         struct messaging_context *msg = tevent_req_callback_data(
912                 req, struct messaging_context);
913         NTSTATUS status;
914
915         status = background_job_recv(req);
916         TALLOC_FREE(req);
917         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
918                   nt_errstr(status)));
919
920         req = background_job_send(
921                 msg, msg->event_ctx, msg, NULL, 0,
922                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
923                             60*15),
924                 mess_parent_dgm_cleanup, msg);
925         if (req == NULL) {
926                 DEBUG(1, ("background_job_send failed\n"));
927         }
928         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
929 }
930
931 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
932 {
933         int ret;
934
935         if (pid == 0) {
936                 ret = messaging_dgm_wipe(msg_ctx);
937         } else {
938                 ret = messaging_dgm_cleanup(msg_ctx, pid);
939         }
940
941         return ret;
942 }
943
944 struct messaging_backend *messaging_local_backend(
945         struct messaging_context *msg_ctx)
946 {
947         return msg_ctx->local;
948 }
949
950 struct tevent_context *messaging_tevent_context(
951         struct messaging_context *msg_ctx)
952 {
953         return msg_ctx->event_ctx;
954 }
955
956 /** @} **/