lib: Use talloc_memdup in messaging_rec_dup
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57
58 struct messaging_callback {
59         struct messaging_callback *prev, *next;
60         uint32 msg_type;
61         void (*fn)(struct messaging_context *msg, void *private_data, 
62                    uint32_t msg_type, 
63                    struct server_id server_id, DATA_BLOB *data);
64         void *private_data;
65 };
66
67 struct messaging_context {
68         struct server_id id;
69         struct tevent_context *event_ctx;
70         struct messaging_callback *callbacks;
71
72         struct tevent_req **new_waiters;
73         unsigned num_new_waiters;
74
75         struct tevent_req **waiters;
76         unsigned num_waiters;
77
78         struct messaging_backend *remote;
79
80         struct server_id_db *names_db;
81 };
82
83 struct messaging_hdr {
84         uint32_t msg_type;
85         struct server_id dst;
86         struct server_id src;
87 };
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32 msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         const struct messaging_hdr *hdr;
217         struct server_id_buf idbuf;
218         struct messaging_rec rec;
219         int64_t fds64[MIN(num_fds, INT8_MAX)];
220         size_t i;
221
222         if (msg_len < sizeof(*hdr)) {
223                 for (i=0; i < num_fds; i++) {
224                         close(fds[i]);
225                 }
226                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
227                 return;
228         }
229
230         if (num_fds > INT8_MAX) {
231                 for (i=0; i < num_fds; i++) {
232                         close(fds[i]);
233                 }
234                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
235                 return;
236         }
237
238         /*
239          * "consume" the fds by copying them and setting
240          * the original variable to -1
241          */
242         for (i=0; i < num_fds; i++) {
243                 fds64[i] = fds[i];
244                 fds[i] = -1;
245         }
246
247         /*
248          * messages_dgm guarantees alignment, so we can cast here
249          */
250         hdr = (const struct messaging_hdr *)msg;
251
252         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
253                    __func__, (unsigned)hdr->msg_type,
254                    (unsigned)(msg_len - sizeof(*hdr)),
255                    (unsigned)num_fds,
256                    server_id_str_buf(hdr->src, &idbuf)));
257
258         rec = (struct messaging_rec) {
259                 .msg_version = MESSAGE_VERSION,
260                 .msg_type = hdr->msg_type,
261                 .src = hdr->src,
262                 .dest = hdr->dst,
263                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
264                 .buf.length = msg_len - sizeof(*hdr),
265                 .num_fds = num_fds,
266                 .fds = fds64,
267         };
268
269         messaging_dispatch_rec(msg_ctx, &rec);
270 }
271
272 static int messaging_context_destructor(struct messaging_context *ctx)
273 {
274         unsigned i;
275
276         messaging_dgm_destroy();
277
278         for (i=0; i<ctx->num_new_waiters; i++) {
279                 if (ctx->new_waiters[i] != NULL) {
280                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
281                         ctx->new_waiters[i] = NULL;
282                 }
283         }
284         for (i=0; i<ctx->num_waiters; i++) {
285                 if (ctx->waiters[i] != NULL) {
286                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
287                         ctx->waiters[i] = NULL;
288                 }
289         }
290
291         return 0;
292 }
293
294 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
295                                          struct tevent_context *ev)
296 {
297         struct messaging_context *ctx;
298         NTSTATUS status;
299         int ret;
300
301         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
302                 return NULL;
303         }
304
305         ctx->id = procid_self();
306         ctx->event_ctx = ev;
307
308         sec_init();
309
310         ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
311                                  lp_cache_directory(), sec_initial_uid(),
312                                  messaging_recv_cb, ctx);
313
314         if (ret != 0) {
315                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
316                 TALLOC_FREE(ctx);
317                 return NULL;
318         }
319
320         ctx->names_db = server_id_db_init(
321                 ctx, ctx->id, lp_cache_directory(), 0,
322                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
323         if (ctx->names_db == NULL) {
324                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
325                 TALLOC_FREE(ctx);
326                 return NULL;
327         }
328
329         talloc_set_destructor(ctx, messaging_context_destructor);
330
331         if (lp_clustering()) {
332                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
333
334                 if (!NT_STATUS_IS_OK(status)) {
335                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
336                                   nt_errstr(status)));
337                         TALLOC_FREE(ctx);
338                         return NULL;
339                 }
340         }
341         ctx->id.vnn = get_my_vnn();
342
343         messaging_register(ctx, NULL, MSG_PING, ping_message);
344
345         /* Register some debugging related messages */
346
347         register_msg_pool_usage(ctx);
348         register_dmalloc_msgs(ctx);
349         debug_register_msgs(ctx);
350
351         return ctx;
352 }
353
354 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
355 {
356         return msg_ctx->id;
357 }
358
359 /*
360  * re-init after a fork
361  */
362 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
363 {
364         NTSTATUS status;
365         int ret;
366
367         messaging_dgm_destroy();
368
369         msg_ctx->id = procid_self();
370
371         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
372                                  lp_cache_directory(), sec_initial_uid(),
373                                  messaging_recv_cb, msg_ctx);
374         if (ret != 0) {
375                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
376                 return map_nt_error_from_unix(ret);
377         }
378
379         TALLOC_FREE(msg_ctx->remote);
380
381         if (lp_clustering()) {
382                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
383                                               &msg_ctx->remote);
384
385                 if (!NT_STATUS_IS_OK(status)) {
386                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
387                                   nt_errstr(status)));
388                         return status;
389                 }
390         }
391
392         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
393
394         return NT_STATUS_OK;
395 }
396
397
398 /*
399  * Register a dispatch function for a particular message type. Allow multiple
400  * registrants
401 */
402 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
403                             void *private_data,
404                             uint32_t msg_type,
405                             void (*fn)(struct messaging_context *msg,
406                                        void *private_data, 
407                                        uint32_t msg_type, 
408                                        struct server_id server_id,
409                                        DATA_BLOB *data))
410 {
411         struct messaging_callback *cb;
412
413         DEBUG(5, ("Registering messaging pointer for type %u - "
414                   "private_data=%p\n",
415                   (unsigned)msg_type, private_data));
416
417         /*
418          * Only one callback per type
419          */
420
421         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
422                 /* we allow a second registration of the same message
423                    type if it has a different private pointer. This is
424                    needed in, for example, the internal notify code,
425                    which creates a new notify context for each tree
426                    connect, and expects to receive messages to each of
427                    them. */
428                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
429                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
430                                   (unsigned)msg_type, private_data));
431                         cb->fn = fn;
432                         cb->private_data = private_data;
433                         return NT_STATUS_OK;
434                 }
435         }
436
437         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
438                 return NT_STATUS_NO_MEMORY;
439         }
440
441         cb->msg_type = msg_type;
442         cb->fn = fn;
443         cb->private_data = private_data;
444
445         DLIST_ADD(msg_ctx->callbacks, cb);
446         return NT_STATUS_OK;
447 }
448
449 /*
450   De-register the function for a particular message type.
451 */
452 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
453                           void *private_data)
454 {
455         struct messaging_callback *cb, *next;
456
457         for (cb = ctx->callbacks; cb; cb = next) {
458                 next = cb->next;
459                 if ((cb->msg_type == msg_type)
460                     && (cb->private_data == private_data)) {
461                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
462                                   (unsigned)msg_type, private_data));
463                         DLIST_REMOVE(ctx->callbacks, cb);
464                         TALLOC_FREE(cb);
465                 }
466         }
467 }
468
469 /*
470   Send a message to a particular server
471 */
472 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
473                         struct server_id server, uint32_t msg_type,
474                         const DATA_BLOB *data)
475 {
476         struct iovec iov;
477
478         iov.iov_base = data->data;
479         iov.iov_len = data->length;
480
481         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
482 }
483
484 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
485                             struct server_id server, uint32_t msg_type,
486                             const uint8_t *buf, size_t len)
487 {
488         DATA_BLOB blob = data_blob_const(buf, len);
489         return messaging_send(msg_ctx, server, msg_type, &blob);
490 }
491
492 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
493                                  struct server_id src, struct server_id dst,
494                                  uint32_t msg_type,
495                                  const struct iovec *iov, int iovlen,
496                                  const int *fds, size_t num_fds)
497 {
498         int ret;
499         struct messaging_hdr hdr;
500         struct iovec iov2[iovlen+1];
501
502         if (server_id_is_disconnected(&dst)) {
503                 return NT_STATUS_INVALID_PARAMETER_MIX;
504         }
505
506         if (num_fds > INT8_MAX) {
507                 return NT_STATUS_INVALID_PARAMETER_MIX;
508         }
509
510         if (!procid_is_local(&dst)) {
511                 if (num_fds > 0) {
512                         return NT_STATUS_NOT_SUPPORTED;
513                 }
514
515                 ret = msg_ctx->remote->send_fn(src, dst,
516                                                msg_type, iov, iovlen,
517                                                NULL, 0,
518                                                msg_ctx->remote);
519                 if (ret != 0) {
520                         return map_nt_error_from_unix(ret);
521                 }
522                 return NT_STATUS_OK;
523         }
524
525         ZERO_STRUCT(hdr);
526         hdr = (struct messaging_hdr) {
527                 .msg_type = msg_type,
528                 .dst = dst,
529                 .src = src
530         };
531         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
532         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
533
534         become_root();
535         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
536         unbecome_root();
537
538         if (ret != 0) {
539                 return map_nt_error_from_unix(ret);
540         }
541         return NT_STATUS_OK;
542 }
543
544 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
545                             struct server_id dst, uint32_t msg_type,
546                             const struct iovec *iov, int iovlen,
547                             const int *fds, size_t num_fds)
548 {
549         return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
550                                        iov, iovlen, fds, num_fds);
551 }
552
553 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
554                                                struct messaging_rec *rec)
555 {
556         struct messaging_rec *result;
557         size_t fds_size = sizeof(int64_t) * rec->num_fds;
558
559         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
560                                       rec->buf.length + fds_size);
561         if (result == NULL) {
562                 return NULL;
563         }
564         *result = *rec;
565
566         /* Doesn't fail, see talloc_pooled_object */
567
568         result->buf.data = talloc_memdup(result, rec->buf.data,
569                                          rec->buf.length);
570
571         result->fds = NULL;
572         if (result->num_fds > 0) {
573                 result->fds = talloc_memdup(result, rec->fds, fds_size);
574         }
575
576         return result;
577 }
578
579 struct messaging_filtered_read_state {
580         struct tevent_context *ev;
581         struct messaging_context *msg_ctx;
582         void *tevent_handle;
583
584         bool (*filter)(struct messaging_rec *rec, void *private_data);
585         void *private_data;
586
587         struct messaging_rec *rec;
588 };
589
590 static void messaging_filtered_read_cleanup(struct tevent_req *req,
591                                             enum tevent_req_state req_state);
592
593 struct tevent_req *messaging_filtered_read_send(
594         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
595         struct messaging_context *msg_ctx,
596         bool (*filter)(struct messaging_rec *rec, void *private_data),
597         void *private_data)
598 {
599         struct tevent_req *req;
600         struct messaging_filtered_read_state *state;
601         size_t new_waiters_len;
602
603         req = tevent_req_create(mem_ctx, &state,
604                                 struct messaging_filtered_read_state);
605         if (req == NULL) {
606                 return NULL;
607         }
608         state->ev = ev;
609         state->msg_ctx = msg_ctx;
610         state->filter = filter;
611         state->private_data = private_data;
612
613         /*
614          * We have to defer the callback here, as we might be called from
615          * within a different tevent_context than state->ev
616          */
617         tevent_req_defer_callback(req, state->ev);
618
619         state->tevent_handle = messaging_dgm_register_tevent_context(
620                 state, ev);
621         if (tevent_req_nomem(state, req)) {
622                 return tevent_req_post(req, ev);
623         }
624
625         /*
626          * We add ourselves to the "new_waiters" array, not the "waiters"
627          * array. If we are called from within messaging_read_done,
628          * messaging_dispatch_rec will be in an active for-loop on
629          * "waiters". We must be careful not to mess with this array, because
630          * it could mean that a single event is being delivered twice.
631          */
632
633         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
634
635         if (new_waiters_len == msg_ctx->num_new_waiters) {
636                 struct tevent_req **tmp;
637
638                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
639                                      struct tevent_req *, new_waiters_len+1);
640                 if (tevent_req_nomem(tmp, req)) {
641                         return tevent_req_post(req, ev);
642                 }
643                 msg_ctx->new_waiters = tmp;
644         }
645
646         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
647         msg_ctx->num_new_waiters += 1;
648         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
649
650         return req;
651 }
652
653 static void messaging_filtered_read_cleanup(struct tevent_req *req,
654                                             enum tevent_req_state req_state)
655 {
656         struct messaging_filtered_read_state *state = tevent_req_data(
657                 req, struct messaging_filtered_read_state);
658         struct messaging_context *msg_ctx = state->msg_ctx;
659         unsigned i;
660
661         tevent_req_set_cleanup_fn(req, NULL);
662
663         TALLOC_FREE(state->tevent_handle);
664
665         /*
666          * Just set the [new_]waiters entry to NULL, be careful not to mess
667          * with the other "waiters" array contents. We are often called from
668          * within "messaging_dispatch_rec", which loops over
669          * "waiters". Messing with the "waiters" array will mess up that
670          * for-loop.
671          */
672
673         for (i=0; i<msg_ctx->num_waiters; i++) {
674                 if (msg_ctx->waiters[i] == req) {
675                         msg_ctx->waiters[i] = NULL;
676                         return;
677                 }
678         }
679
680         for (i=0; i<msg_ctx->num_new_waiters; i++) {
681                 if (msg_ctx->new_waiters[i] == req) {
682                         msg_ctx->new_waiters[i] = NULL;
683                         return;
684                 }
685         }
686 }
687
688 static void messaging_filtered_read_done(struct tevent_req *req,
689                                          struct messaging_rec *rec)
690 {
691         struct messaging_filtered_read_state *state = tevent_req_data(
692                 req, struct messaging_filtered_read_state);
693
694         state->rec = messaging_rec_dup(state, rec);
695         if (tevent_req_nomem(state->rec, req)) {
696                 return;
697         }
698         tevent_req_done(req);
699 }
700
701 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
702                                  struct messaging_rec **presult)
703 {
704         struct messaging_filtered_read_state *state = tevent_req_data(
705                 req, struct messaging_filtered_read_state);
706         int err;
707
708         if (tevent_req_is_unix_error(req, &err)) {
709                 tevent_req_received(req);
710                 return err;
711         }
712         *presult = talloc_move(mem_ctx, &state->rec);
713         return 0;
714 }
715
716 struct messaging_read_state {
717         uint32_t msg_type;
718         struct messaging_rec *rec;
719 };
720
721 static bool messaging_read_filter(struct messaging_rec *rec,
722                                   void *private_data);
723 static void messaging_read_done(struct tevent_req *subreq);
724
725 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
726                                        struct tevent_context *ev,
727                                        struct messaging_context *msg,
728                                        uint32_t msg_type)
729 {
730         struct tevent_req *req, *subreq;
731         struct messaging_read_state *state;
732
733         req = tevent_req_create(mem_ctx, &state,
734                                 struct messaging_read_state);
735         if (req == NULL) {
736                 return NULL;
737         }
738         state->msg_type = msg_type;
739
740         subreq = messaging_filtered_read_send(state, ev, msg,
741                                               messaging_read_filter, state);
742         if (tevent_req_nomem(subreq, req)) {
743                 return tevent_req_post(req, ev);
744         }
745         tevent_req_set_callback(subreq, messaging_read_done, req);
746         return req;
747 }
748
749 static bool messaging_read_filter(struct messaging_rec *rec,
750                                   void *private_data)
751 {
752         struct messaging_read_state *state = talloc_get_type_abort(
753                 private_data, struct messaging_read_state);
754
755         if (rec->num_fds != 0) {
756                 return false;
757         }
758
759         return rec->msg_type == state->msg_type;
760 }
761
762 static void messaging_read_done(struct tevent_req *subreq)
763 {
764         struct tevent_req *req = tevent_req_callback_data(
765                 subreq, struct tevent_req);
766         struct messaging_read_state *state = tevent_req_data(
767                 req, struct messaging_read_state);
768         int ret;
769
770         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
771         TALLOC_FREE(subreq);
772         if (tevent_req_error(req, ret)) {
773                 return;
774         }
775         tevent_req_done(req);
776 }
777
778 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
779                         struct messaging_rec **presult)
780 {
781         struct messaging_read_state *state = tevent_req_data(
782                 req, struct messaging_read_state);
783         int err;
784
785         if (tevent_req_is_unix_error(req, &err)) {
786                 return err;
787         }
788         if (presult != NULL) {
789                 *presult = talloc_move(mem_ctx, &state->rec);
790         }
791         return 0;
792 }
793
794 struct messaging_handler_state {
795         struct tevent_context *ev;
796         struct messaging_context *msg_ctx;
797         uint32_t msg_type;
798         bool (*handler)(struct messaging_context *msg_ctx,
799                         struct messaging_rec **rec, void *private_data);
800         void *private_data;
801 };
802
803 static void messaging_handler_got_msg(struct tevent_req *subreq);
804
805 struct tevent_req *messaging_handler_send(
806         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
807         struct messaging_context *msg_ctx, uint32_t msg_type,
808         bool (*handler)(struct messaging_context *msg_ctx,
809                         struct messaging_rec **rec, void *private_data),
810         void *private_data)
811 {
812         struct tevent_req *req, *subreq;
813         struct messaging_handler_state *state;
814
815         req = tevent_req_create(mem_ctx, &state,
816                                 struct messaging_handler_state);
817         if (req == NULL) {
818                 return NULL;
819         }
820         state->ev = ev;
821         state->msg_ctx = msg_ctx;
822         state->msg_type = msg_type;
823         state->handler = handler;
824         state->private_data = private_data;
825
826         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
827                                      state->msg_type);
828         if (tevent_req_nomem(subreq, req)) {
829                 return tevent_req_post(req, ev);
830         }
831         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
832         return req;
833 }
834
835 static void messaging_handler_got_msg(struct tevent_req *subreq)
836 {
837         struct tevent_req *req = tevent_req_callback_data(
838                 subreq, struct tevent_req);
839         struct messaging_handler_state *state = tevent_req_data(
840                 req, struct messaging_handler_state);
841         struct messaging_rec *rec;
842         int ret;
843         bool ok;
844
845         ret = messaging_read_recv(subreq, state, &rec);
846         TALLOC_FREE(subreq);
847         if (tevent_req_error(req, ret)) {
848                 return;
849         }
850
851         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
852                                      state->msg_type);
853         if (tevent_req_nomem(subreq, req)) {
854                 return;
855         }
856         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
857
858         ok = state->handler(state->msg_ctx, &rec, state->private_data);
859         TALLOC_FREE(rec);
860         if (ok) {
861                 /*
862                  * Next round
863                  */
864                 return;
865         }
866         TALLOC_FREE(subreq);
867         tevent_req_done(req);
868 }
869
870 int messaging_handler_recv(struct tevent_req *req)
871 {
872         return tevent_req_simple_recv_unix(req);
873 }
874
875 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
876 {
877         if (msg_ctx->num_new_waiters == 0) {
878                 return true;
879         }
880
881         if (talloc_array_length(msg_ctx->waiters) <
882             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
883                 struct tevent_req **tmp;
884                 tmp = talloc_realloc(
885                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
886                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
887                 if (tmp == NULL) {
888                         DEBUG(1, ("%s: talloc failed\n", __func__));
889                         return false;
890                 }
891                 msg_ctx->waiters = tmp;
892         }
893
894         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
895                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
896
897         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
898         msg_ctx->num_new_waiters = 0;
899
900         return true;
901 }
902
903 /*
904   Dispatch one messaging_rec
905 */
906 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
907                             struct messaging_rec *rec)
908 {
909         struct messaging_callback *cb, *next;
910         unsigned i;
911         size_t j;
912
913         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
914                 next = cb->next;
915                 if (cb->msg_type != rec->msg_type) {
916                         continue;
917                 }
918
919                 /*
920                  * the old style callbacks don't support fd passing
921                  */
922                 for (j=0; j < rec->num_fds; j++) {
923                         int fd = rec->fds[j];
924                         close(fd);
925                 }
926                 rec->num_fds = 0;
927                 rec->fds = NULL;
928
929                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
930                        rec->src, &rec->buf);
931
932                 /*
933                  * we continue looking for matching messages after finding
934                  * one. This matters for subsystems like the internal notify
935                  * code which register more than one handler for the same
936                  * message type
937                  */
938         }
939
940         if (!messaging_append_new_waiters(msg_ctx)) {
941                 for (j=0; j < rec->num_fds; j++) {
942                         int fd = rec->fds[j];
943                         close(fd);
944                 }
945                 rec->num_fds = 0;
946                 rec->fds = NULL;
947                 return;
948         }
949
950         i = 0;
951         while (i < msg_ctx->num_waiters) {
952                 struct tevent_req *req;
953                 struct messaging_filtered_read_state *state;
954
955                 req = msg_ctx->waiters[i];
956                 if (req == NULL) {
957                         /*
958                          * This got cleaned up. In the meantime,
959                          * move everything down one. We need
960                          * to keep the order of waiters, as
961                          * other code may depend on this.
962                          */
963                         if (i < msg_ctx->num_waiters - 1) {
964                                 memmove(&msg_ctx->waiters[i],
965                                         &msg_ctx->waiters[i+1],
966                                         sizeof(struct tevent_req *) *
967                                             (msg_ctx->num_waiters - i - 1));
968                         }
969                         msg_ctx->num_waiters -= 1;
970                         continue;
971                 }
972
973                 state = tevent_req_data(
974                         req, struct messaging_filtered_read_state);
975                 if (state->filter(rec, state->private_data)) {
976                         messaging_filtered_read_done(req, rec);
977
978                         /*
979                          * Only the first one gets the fd-array
980                          */
981                         rec->num_fds = 0;
982                         rec->fds = NULL;
983                 }
984
985                 i += 1;
986         }
987
988         /*
989          * If the fd-array isn't used, just close it.
990          */
991         for (j=0; j < rec->num_fds; j++) {
992                 int fd = rec->fds[j];
993                 close(fd);
994         }
995         rec->num_fds = 0;
996         rec->fds = NULL;
997 }
998
999 static int mess_parent_dgm_cleanup(void *private_data);
1000 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1001
1002 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1003 {
1004         struct tevent_req *req;
1005
1006         req = background_job_send(
1007                 msg, msg->event_ctx, msg, NULL, 0,
1008                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1009                             60*15),
1010                 mess_parent_dgm_cleanup, msg);
1011         if (req == NULL) {
1012                 return false;
1013         }
1014         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1015         return true;
1016 }
1017
1018 static int mess_parent_dgm_cleanup(void *private_data)
1019 {
1020         int ret;
1021
1022         ret = messaging_dgm_wipe();
1023         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1024                    ret ? strerror(ret) : "ok"));
1025         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1026                            60*15);
1027 }
1028
1029 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1030 {
1031         struct messaging_context *msg = tevent_req_callback_data(
1032                 req, struct messaging_context);
1033         NTSTATUS status;
1034
1035         status = background_job_recv(req);
1036         TALLOC_FREE(req);
1037         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1038                   nt_errstr(status)));
1039
1040         req = background_job_send(
1041                 msg, msg->event_ctx, msg, NULL, 0,
1042                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1043                             60*15),
1044                 mess_parent_dgm_cleanup, msg);
1045         if (req == NULL) {
1046                 DEBUG(1, ("background_job_send failed\n"));
1047         }
1048         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1049 }
1050
1051 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1052 {
1053         int ret;
1054
1055         if (pid == 0) {
1056                 ret = messaging_dgm_wipe();
1057         } else {
1058                 ret = messaging_dgm_cleanup(pid);
1059         }
1060
1061         return ret;
1062 }
1063
1064 struct tevent_context *messaging_tevent_context(
1065         struct messaging_context *msg_ctx)
1066 {
1067         return msg_ctx->event_ctx;
1068 }
1069
1070 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1071 {
1072         return msg_ctx->names_db;
1073 }
1074
1075 /** @} **/