messaging: Optimize self-sends
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
87                                                struct messaging_rec *rec);
88 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
89                                    struct messaging_rec *rec);
90
91 /****************************************************************************
92  A useful function for testing the message system.
93 ****************************************************************************/
94
95 static void ping_message(struct messaging_context *msg_ctx,
96                          void *private_data,
97                          uint32_t msg_type,
98                          struct server_id src,
99                          DATA_BLOB *data)
100 {
101         struct server_id_buf idbuf;
102
103         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
104                   server_id_str_buf(src, &idbuf), (int)data->length,
105                   data->data ? (char *)data->data : ""));
106
107         messaging_send(msg_ctx, src, MSG_PONG, data);
108 }
109
110 static struct messaging_rec *messaging_rec_create(
111         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
112         uint32_t msg_type, const struct iovec *iov, int iovlen,
113         const int *fds, size_t num_fds)
114 {
115         ssize_t buflen;
116         uint8_t *buf;
117         struct messaging_rec *result;
118
119         if (num_fds > INT8_MAX) {
120                 return NULL;
121         }
122
123         buflen = iov_buflen(iov, iovlen);
124         if (buflen == -1) {
125                 return NULL;
126         }
127         buf = talloc_array(mem_ctx, uint8_t, buflen);
128         if (buf == NULL) {
129                 return NULL;
130         }
131         iov_buf(iov, iovlen, buf, buflen);
132
133         {
134                 struct messaging_rec rec;
135                 int64_t fds64[num_fds];
136                 size_t i;
137
138                 for (i=0; i<num_fds; i++) {
139                         fds64[i] = fds[i];
140                 }
141
142                 rec = (struct messaging_rec) {
143                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
144                         .src = src, .dest = dst,
145                         .buf.data = buf, .buf.length = buflen,
146                         .num_fds = num_fds, .fds = fds64,
147                 };
148
149                 result = messaging_rec_dup(mem_ctx, &rec);
150         }
151
152         TALLOC_FREE(buf);
153
154         return result;
155 }
156
157 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
158                               int *fds, size_t num_fds,
159                               void *private_data)
160 {
161         struct messaging_context *msg_ctx = talloc_get_type_abort(
162                 private_data, struct messaging_context);
163         struct server_id_buf idbuf;
164         struct messaging_rec rec;
165         int64_t fds64[MIN(num_fds, INT8_MAX)];
166         size_t i;
167
168         if (msg_len < MESSAGE_HDR_LENGTH) {
169                 DBG_WARNING("message too short: %zu\n", msg_len);
170                 goto close_fail;
171         }
172
173         if (num_fds > INT8_MAX) {
174                 DBG_WARNING("too many fds: %zu\n", num_fds);
175                 goto close_fail;
176         }
177
178         /*
179          * "consume" the fds by copying them and setting
180          * the original variable to -1
181          */
182         for (i=0; i < num_fds; i++) {
183                 fds64[i] = fds[i];
184                 fds[i] = -1;
185         }
186
187         rec = (struct messaging_rec) {
188                 .msg_version = MESSAGE_VERSION,
189                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
190                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
191                 .num_fds = num_fds,
192                 .fds = fds64,
193         };
194
195         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
196
197         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
198                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
199                   server_id_str_buf(rec.src, &idbuf));
200
201         messaging_dispatch_rec(msg_ctx, &rec);
202         return;
203
204 close_fail:
205         for (i=0; i < num_fds; i++) {
206                 close(fds[i]);
207         }
208 }
209
210 static int messaging_context_destructor(struct messaging_context *ctx)
211 {
212         unsigned i;
213
214         for (i=0; i<ctx->num_new_waiters; i++) {
215                 if (ctx->new_waiters[i] != NULL) {
216                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
217                         ctx->new_waiters[i] = NULL;
218                 }
219         }
220         for (i=0; i<ctx->num_waiters; i++) {
221                 if (ctx->waiters[i] != NULL) {
222                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
223                         ctx->waiters[i] = NULL;
224                 }
225         }
226
227         return 0;
228 }
229
230 static const char *private_path(const char *name)
231 {
232         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
233 }
234
235 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
236                                          struct tevent_context *ev)
237 {
238         struct messaging_context *ctx;
239         int ret;
240         const char *lck_path;
241         const char *priv_path;
242         bool ok;
243
244         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
245                 return NULL;
246         }
247
248         ctx->id = (struct server_id) {
249                 .pid = getpid(), .vnn = NONCLUSTER_VNN
250         };
251
252         ctx->event_ctx = ev;
253
254         sec_init();
255
256         lck_path = lock_path("msg.lock");
257         if (lck_path == NULL) {
258                 TALLOC_FREE(ctx);
259                 return NULL;
260         }
261
262         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
263                                               0755);
264         if (!ok) {
265                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
266                            __func__, strerror(errno)));
267                 TALLOC_FREE(ctx);
268                 return NULL;
269         }
270
271         priv_path = private_path("msg.sock");
272         if (priv_path == NULL) {
273                 TALLOC_FREE(ctx);
274                 return NULL;
275         }
276
277         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
278                                               0700);
279         if (!ok) {
280                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
281                            __func__, strerror(errno)));
282                 TALLOC_FREE(ctx);
283                 return NULL;
284         }
285
286         ctx->msg_dgm_ref = messaging_dgm_ref(
287                 ctx, ctx->event_ctx, &ctx->id.unique_id,
288                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
289
290         if (ctx->msg_dgm_ref == NULL) {
291                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
292                 TALLOC_FREE(ctx);
293                 return NULL;
294         }
295
296         talloc_set_destructor(ctx, messaging_context_destructor);
297
298         if (lp_clustering()) {
299                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
300
301                 if (ret != 0) {
302                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
303                                   strerror(ret)));
304                         TALLOC_FREE(ctx);
305                         return NULL;
306                 }
307         }
308         ctx->id.vnn = get_my_vnn();
309
310         ctx->names_db = server_id_db_init(
311                 ctx, ctx->id, lp_lock_directory(), 0,
312                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
313         if (ctx->names_db == NULL) {
314                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
315                 TALLOC_FREE(ctx);
316                 return NULL;
317         }
318
319         messaging_register(ctx, NULL, MSG_PING, ping_message);
320
321         /* Register some debugging related messages */
322
323         register_msg_pool_usage(ctx);
324         register_dmalloc_msgs(ctx);
325         debug_register_msgs(ctx);
326
327         {
328                 struct server_id_buf tmp;
329                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
330         }
331
332         return ctx;
333 }
334
335 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
336 {
337         return msg_ctx->id;
338 }
339
340 /*
341  * re-init after a fork
342  */
343 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
344 {
345         int ret;
346         char *lck_path;
347
348         TALLOC_FREE(msg_ctx->msg_dgm_ref);
349
350         msg_ctx->id = (struct server_id) {
351                 .pid = getpid(), .vnn = msg_ctx->id.vnn
352         };
353
354         lck_path = lock_path("msg.lock");
355         if (lck_path == NULL) {
356                 return NT_STATUS_NO_MEMORY;
357         }
358
359         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
360                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
361                 private_path("msg.sock"), lck_path,
362                 messaging_recv_cb, msg_ctx, &ret);
363
364         if (msg_ctx->msg_dgm_ref == NULL) {
365                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
366                 return map_nt_error_from_unix(ret);
367         }
368
369         if (lp_clustering()) {
370                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
371                                              msg_ctx->remote);
372
373                 if (ret != 0) {
374                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
375                                   strerror(ret)));
376                         return map_nt_error_from_unix(ret);
377                 }
378         }
379
380         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
381
382         return NT_STATUS_OK;
383 }
384
385
386 /*
387  * Register a dispatch function for a particular message type. Allow multiple
388  * registrants
389 */
390 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
391                             void *private_data,
392                             uint32_t msg_type,
393                             void (*fn)(struct messaging_context *msg,
394                                        void *private_data, 
395                                        uint32_t msg_type, 
396                                        struct server_id server_id,
397                                        DATA_BLOB *data))
398 {
399         struct messaging_callback *cb;
400
401         DEBUG(5, ("Registering messaging pointer for type %u - "
402                   "private_data=%p\n",
403                   (unsigned)msg_type, private_data));
404
405         /*
406          * Only one callback per type
407          */
408
409         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
410                 /* we allow a second registration of the same message
411                    type if it has a different private pointer. This is
412                    needed in, for example, the internal notify code,
413                    which creates a new notify context for each tree
414                    connect, and expects to receive messages to each of
415                    them. */
416                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
417                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
418                                   (unsigned)msg_type, private_data));
419                         cb->fn = fn;
420                         cb->private_data = private_data;
421                         return NT_STATUS_OK;
422                 }
423         }
424
425         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
426                 return NT_STATUS_NO_MEMORY;
427         }
428
429         cb->msg_type = msg_type;
430         cb->fn = fn;
431         cb->private_data = private_data;
432
433         DLIST_ADD(msg_ctx->callbacks, cb);
434         return NT_STATUS_OK;
435 }
436
437 /*
438   De-register the function for a particular message type.
439 */
440 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
441                           void *private_data)
442 {
443         struct messaging_callback *cb, *next;
444
445         for (cb = ctx->callbacks; cb; cb = next) {
446                 next = cb->next;
447                 if ((cb->msg_type == msg_type)
448                     && (cb->private_data == private_data)) {
449                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
450                                   (unsigned)msg_type, private_data));
451                         DLIST_REMOVE(ctx->callbacks, cb);
452                         TALLOC_FREE(cb);
453                 }
454         }
455 }
456
457 /*
458   Send a message to a particular server
459 */
460 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
461                         struct server_id server, uint32_t msg_type,
462                         const DATA_BLOB *data)
463 {
464         struct iovec iov = {0};
465
466         if (data != NULL) {
467                 iov.iov_base = data->data;
468                 iov.iov_len = data->length;
469         };
470
471         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
472 }
473
474 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
475                             struct server_id server, uint32_t msg_type,
476                             const uint8_t *buf, size_t len)
477 {
478         DATA_BLOB blob = data_blob_const(buf, len);
479         return messaging_send(msg_ctx, server, msg_type, &blob);
480 }
481
482 struct messaging_post_state {
483         struct messaging_context *msg_ctx;
484         struct messaging_rec *rec;
485 };
486
487 static void messaging_post_handler(struct tevent_context *ev,
488                                    struct tevent_immediate *ti,
489                                    void *private_data);
490
491 static int messaging_post_self(struct messaging_context *msg_ctx,
492                                struct server_id src, struct server_id dst,
493                                uint32_t msg_type,
494                                const struct iovec *iov, int iovlen,
495                                const int *fds, size_t num_fds)
496 {
497         struct tevent_immediate *ti;
498         struct messaging_post_state *state;
499
500         state = talloc(msg_ctx, struct messaging_post_state);
501         if (state == NULL) {
502                 return ENOMEM;
503         }
504         state->msg_ctx = msg_ctx;
505
506         ti = tevent_create_immediate(state);
507         if (ti == NULL) {
508                 goto fail;
509         }
510         state->rec = messaging_rec_create(
511                 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
512         if (state->rec == NULL) {
513                 goto fail;
514         }
515
516         tevent_schedule_immediate(ti, msg_ctx->event_ctx,
517                                   messaging_post_handler, state);
518         return 0;
519
520 fail:
521         TALLOC_FREE(state);
522         return ENOMEM;
523 }
524
525 static void messaging_post_handler(struct tevent_context *ev,
526                                    struct tevent_immediate *ti,
527                                    void *private_data)
528 {
529         struct messaging_post_state *state = talloc_get_type_abort(
530                 private_data, struct messaging_post_state);
531         messaging_dispatch_rec(state->msg_ctx, state->rec);
532         TALLOC_FREE(state);
533 }
534
535 int messaging_send_iov_from(struct messaging_context *msg_ctx,
536                             struct server_id src, struct server_id dst,
537                             uint32_t msg_type,
538                             const struct iovec *iov, int iovlen,
539                             const int *fds, size_t num_fds)
540 {
541         int ret;
542         uint8_t hdr[MESSAGE_HDR_LENGTH];
543         struct iovec iov2[iovlen+1];
544
545         if (server_id_is_disconnected(&dst)) {
546                 return EINVAL;
547         }
548
549         if (num_fds > INT8_MAX) {
550                 return EINVAL;
551         }
552
553         if (dst.vnn != msg_ctx->id.vnn) {
554                 if (num_fds > 0) {
555                         return ENOSYS;
556                 }
557
558                 ret = msg_ctx->remote->send_fn(src, dst,
559                                                msg_type, iov, iovlen,
560                                                NULL, 0,
561                                                msg_ctx->remote);
562                 return ret;
563         }
564
565         if (server_id_equal(&dst, &msg_ctx->id)) {
566                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
567                                           iov, iovlen, fds, num_fds);
568                 return ret;
569         }
570
571         message_hdr_put(hdr, msg_type, src, dst);
572         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
573         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
574
575         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
576
577         if (ret == EACCES) {
578                 become_root();
579                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
580                                          fds, num_fds);
581                 unbecome_root();
582         }
583
584         return ret;
585 }
586
587 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
588                             struct server_id dst, uint32_t msg_type,
589                             const struct iovec *iov, int iovlen,
590                             const int *fds, size_t num_fds)
591 {
592         int ret;
593
594         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
595                                       iov, iovlen, fds, num_fds);
596         if (ret != 0) {
597                 return map_nt_error_from_unix(ret);
598         }
599         return NT_STATUS_OK;
600 }
601
602 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
603                                                struct messaging_rec *rec)
604 {
605         struct messaging_rec *result;
606         size_t fds_size = sizeof(int64_t) * rec->num_fds;
607         size_t payload_len;
608
609         payload_len = rec->buf.length + fds_size;
610         if (payload_len < rec->buf.length) {
611                 /* overflow */
612                 return NULL;
613         }
614
615         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
616                                       payload_len);
617         if (result == NULL) {
618                 return NULL;
619         }
620         *result = *rec;
621
622         /* Doesn't fail, see talloc_pooled_object */
623
624         result->buf.data = talloc_memdup(result, rec->buf.data,
625                                          rec->buf.length);
626
627         result->fds = NULL;
628         if (result->num_fds > 0) {
629                 result->fds = talloc_memdup(result, rec->fds, fds_size);
630         }
631
632         return result;
633 }
634
635 struct messaging_filtered_read_state {
636         struct tevent_context *ev;
637         struct messaging_context *msg_ctx;
638         void *tevent_handle;
639
640         bool (*filter)(struct messaging_rec *rec, void *private_data);
641         void *private_data;
642
643         struct messaging_rec *rec;
644 };
645
646 static void messaging_filtered_read_cleanup(struct tevent_req *req,
647                                             enum tevent_req_state req_state);
648
649 struct tevent_req *messaging_filtered_read_send(
650         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
651         struct messaging_context *msg_ctx,
652         bool (*filter)(struct messaging_rec *rec, void *private_data),
653         void *private_data)
654 {
655         struct tevent_req *req;
656         struct messaging_filtered_read_state *state;
657         size_t new_waiters_len;
658
659         req = tevent_req_create(mem_ctx, &state,
660                                 struct messaging_filtered_read_state);
661         if (req == NULL) {
662                 return NULL;
663         }
664         state->ev = ev;
665         state->msg_ctx = msg_ctx;
666         state->filter = filter;
667         state->private_data = private_data;
668
669         /*
670          * We have to defer the callback here, as we might be called from
671          * within a different tevent_context than state->ev
672          */
673         tevent_req_defer_callback(req, state->ev);
674
675         state->tevent_handle = messaging_dgm_register_tevent_context(
676                 state, ev);
677         if (tevent_req_nomem(state->tevent_handle, req)) {
678                 return tevent_req_post(req, ev);
679         }
680
681         /*
682          * We add ourselves to the "new_waiters" array, not the "waiters"
683          * array. If we are called from within messaging_read_done,
684          * messaging_dispatch_rec will be in an active for-loop on
685          * "waiters". We must be careful not to mess with this array, because
686          * it could mean that a single event is being delivered twice.
687          */
688
689         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
690
691         if (new_waiters_len == msg_ctx->num_new_waiters) {
692                 struct tevent_req **tmp;
693
694                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
695                                      struct tevent_req *, new_waiters_len+1);
696                 if (tevent_req_nomem(tmp, req)) {
697                         return tevent_req_post(req, ev);
698                 }
699                 msg_ctx->new_waiters = tmp;
700         }
701
702         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
703         msg_ctx->num_new_waiters += 1;
704         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
705
706         return req;
707 }
708
709 static void messaging_filtered_read_cleanup(struct tevent_req *req,
710                                             enum tevent_req_state req_state)
711 {
712         struct messaging_filtered_read_state *state = tevent_req_data(
713                 req, struct messaging_filtered_read_state);
714         struct messaging_context *msg_ctx = state->msg_ctx;
715         unsigned i;
716
717         tevent_req_set_cleanup_fn(req, NULL);
718
719         TALLOC_FREE(state->tevent_handle);
720
721         /*
722          * Just set the [new_]waiters entry to NULL, be careful not to mess
723          * with the other "waiters" array contents. We are often called from
724          * within "messaging_dispatch_rec", which loops over
725          * "waiters". Messing with the "waiters" array will mess up that
726          * for-loop.
727          */
728
729         for (i=0; i<msg_ctx->num_waiters; i++) {
730                 if (msg_ctx->waiters[i] == req) {
731                         msg_ctx->waiters[i] = NULL;
732                         return;
733                 }
734         }
735
736         for (i=0; i<msg_ctx->num_new_waiters; i++) {
737                 if (msg_ctx->new_waiters[i] == req) {
738                         msg_ctx->new_waiters[i] = NULL;
739                         return;
740                 }
741         }
742 }
743
744 static void messaging_filtered_read_done(struct tevent_req *req,
745                                          struct messaging_rec *rec)
746 {
747         struct messaging_filtered_read_state *state = tevent_req_data(
748                 req, struct messaging_filtered_read_state);
749
750         state->rec = messaging_rec_dup(state, rec);
751         if (tevent_req_nomem(state->rec, req)) {
752                 return;
753         }
754         tevent_req_done(req);
755 }
756
757 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
758                                  struct messaging_rec **presult)
759 {
760         struct messaging_filtered_read_state *state = tevent_req_data(
761                 req, struct messaging_filtered_read_state);
762         int err;
763
764         if (tevent_req_is_unix_error(req, &err)) {
765                 tevent_req_received(req);
766                 return err;
767         }
768         if (presult != NULL) {
769                 *presult = talloc_move(mem_ctx, &state->rec);
770         }
771         return 0;
772 }
773
774 struct messaging_read_state {
775         uint32_t msg_type;
776         struct messaging_rec *rec;
777 };
778
779 static bool messaging_read_filter(struct messaging_rec *rec,
780                                   void *private_data);
781 static void messaging_read_done(struct tevent_req *subreq);
782
783 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
784                                        struct tevent_context *ev,
785                                        struct messaging_context *msg,
786                                        uint32_t msg_type)
787 {
788         struct tevent_req *req, *subreq;
789         struct messaging_read_state *state;
790
791         req = tevent_req_create(mem_ctx, &state,
792                                 struct messaging_read_state);
793         if (req == NULL) {
794                 return NULL;
795         }
796         state->msg_type = msg_type;
797
798         subreq = messaging_filtered_read_send(state, ev, msg,
799                                               messaging_read_filter, state);
800         if (tevent_req_nomem(subreq, req)) {
801                 return tevent_req_post(req, ev);
802         }
803         tevent_req_set_callback(subreq, messaging_read_done, req);
804         return req;
805 }
806
807 static bool messaging_read_filter(struct messaging_rec *rec,
808                                   void *private_data)
809 {
810         struct messaging_read_state *state = talloc_get_type_abort(
811                 private_data, struct messaging_read_state);
812
813         if (rec->num_fds != 0) {
814                 return false;
815         }
816
817         return rec->msg_type == state->msg_type;
818 }
819
820 static void messaging_read_done(struct tevent_req *subreq)
821 {
822         struct tevent_req *req = tevent_req_callback_data(
823                 subreq, struct tevent_req);
824         struct messaging_read_state *state = tevent_req_data(
825                 req, struct messaging_read_state);
826         int ret;
827
828         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
829         TALLOC_FREE(subreq);
830         if (tevent_req_error(req, ret)) {
831                 return;
832         }
833         tevent_req_done(req);
834 }
835
836 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
837                         struct messaging_rec **presult)
838 {
839         struct messaging_read_state *state = tevent_req_data(
840                 req, struct messaging_read_state);
841         int err;
842
843         if (tevent_req_is_unix_error(req, &err)) {
844                 return err;
845         }
846         if (presult != NULL) {
847                 *presult = talloc_move(mem_ctx, &state->rec);
848         }
849         return 0;
850 }
851
852 struct messaging_handler_state {
853         struct tevent_context *ev;
854         struct messaging_context *msg_ctx;
855         uint32_t msg_type;
856         bool (*handler)(struct messaging_context *msg_ctx,
857                         struct messaging_rec **rec, void *private_data);
858         void *private_data;
859 };
860
861 static void messaging_handler_got_msg(struct tevent_req *subreq);
862
863 struct tevent_req *messaging_handler_send(
864         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
865         struct messaging_context *msg_ctx, uint32_t msg_type,
866         bool (*handler)(struct messaging_context *msg_ctx,
867                         struct messaging_rec **rec, void *private_data),
868         void *private_data)
869 {
870         struct tevent_req *req, *subreq;
871         struct messaging_handler_state *state;
872
873         req = tevent_req_create(mem_ctx, &state,
874                                 struct messaging_handler_state);
875         if (req == NULL) {
876                 return NULL;
877         }
878         state->ev = ev;
879         state->msg_ctx = msg_ctx;
880         state->msg_type = msg_type;
881         state->handler = handler;
882         state->private_data = private_data;
883
884         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
885                                      state->msg_type);
886         if (tevent_req_nomem(subreq, req)) {
887                 return tevent_req_post(req, ev);
888         }
889         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
890         return req;
891 }
892
893 static void messaging_handler_got_msg(struct tevent_req *subreq)
894 {
895         struct tevent_req *req = tevent_req_callback_data(
896                 subreq, struct tevent_req);
897         struct messaging_handler_state *state = tevent_req_data(
898                 req, struct messaging_handler_state);
899         struct messaging_rec *rec;
900         int ret;
901         bool ok;
902
903         ret = messaging_read_recv(subreq, state, &rec);
904         TALLOC_FREE(subreq);
905         if (tevent_req_error(req, ret)) {
906                 return;
907         }
908
909         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
910                                      state->msg_type);
911         if (tevent_req_nomem(subreq, req)) {
912                 return;
913         }
914         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
915
916         ok = state->handler(state->msg_ctx, &rec, state->private_data);
917         TALLOC_FREE(rec);
918         if (ok) {
919                 /*
920                  * Next round
921                  */
922                 return;
923         }
924         TALLOC_FREE(subreq);
925         tevent_req_done(req);
926 }
927
928 int messaging_handler_recv(struct tevent_req *req)
929 {
930         return tevent_req_simple_recv_unix(req);
931 }
932
933 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
934 {
935         if (msg_ctx->num_new_waiters == 0) {
936                 return true;
937         }
938
939         if (talloc_array_length(msg_ctx->waiters) <
940             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
941                 struct tevent_req **tmp;
942                 tmp = talloc_realloc(
943                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
944                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
945                 if (tmp == NULL) {
946                         DEBUG(1, ("%s: talloc failed\n", __func__));
947                         return false;
948                 }
949                 msg_ctx->waiters = tmp;
950         }
951
952         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
953                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
954
955         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
956         msg_ctx->num_new_waiters = 0;
957
958         return true;
959 }
960
961 /*
962   Dispatch one messaging_rec
963 */
964 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
965                                    struct messaging_rec *rec)
966 {
967         struct messaging_callback *cb, *next;
968         unsigned i;
969         size_t j;
970
971         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
972                 next = cb->next;
973                 if (cb->msg_type != rec->msg_type) {
974                         continue;
975                 }
976
977                 /*
978                  * the old style callbacks don't support fd passing
979                  */
980                 for (j=0; j < rec->num_fds; j++) {
981                         int fd = rec->fds[j];
982                         close(fd);
983                 }
984                 rec->num_fds = 0;
985                 rec->fds = NULL;
986
987                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
988                        rec->src, &rec->buf);
989
990                 /*
991                  * we continue looking for matching messages after finding
992                  * one. This matters for subsystems like the internal notify
993                  * code which register more than one handler for the same
994                  * message type
995                  */
996         }
997
998         if (!messaging_append_new_waiters(msg_ctx)) {
999                 for (j=0; j < rec->num_fds; j++) {
1000                         int fd = rec->fds[j];
1001                         close(fd);
1002                 }
1003                 rec->num_fds = 0;
1004                 rec->fds = NULL;
1005                 return;
1006         }
1007
1008         i = 0;
1009         while (i < msg_ctx->num_waiters) {
1010                 struct tevent_req *req;
1011                 struct messaging_filtered_read_state *state;
1012
1013                 req = msg_ctx->waiters[i];
1014                 if (req == NULL) {
1015                         /*
1016                          * This got cleaned up. In the meantime,
1017                          * move everything down one. We need
1018                          * to keep the order of waiters, as
1019                          * other code may depend on this.
1020                          */
1021                         if (i < msg_ctx->num_waiters - 1) {
1022                                 memmove(&msg_ctx->waiters[i],
1023                                         &msg_ctx->waiters[i+1],
1024                                         sizeof(struct tevent_req *) *
1025                                             (msg_ctx->num_waiters - i - 1));
1026                         }
1027                         msg_ctx->num_waiters -= 1;
1028                         continue;
1029                 }
1030
1031                 state = tevent_req_data(
1032                         req, struct messaging_filtered_read_state);
1033                 if (state->filter(rec, state->private_data)) {
1034                         messaging_filtered_read_done(req, rec);
1035
1036                         /*
1037                          * Only the first one gets the fd-array
1038                          */
1039                         rec->num_fds = 0;
1040                         rec->fds = NULL;
1041                 }
1042
1043                 i += 1;
1044         }
1045
1046         /*
1047          * If the fd-array isn't used, just close it.
1048          */
1049         for (j=0; j < rec->num_fds; j++) {
1050                 int fd = rec->fds[j];
1051                 close(fd);
1052         }
1053         rec->num_fds = 0;
1054         rec->fds = NULL;
1055 }
1056
1057 static int mess_parent_dgm_cleanup(void *private_data);
1058 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1059
1060 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1061 {
1062         struct tevent_req *req;
1063
1064         req = background_job_send(
1065                 msg, msg->event_ctx, msg, NULL, 0,
1066                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1067                             60*15),
1068                 mess_parent_dgm_cleanup, msg);
1069         if (req == NULL) {
1070                 return false;
1071         }
1072         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1073         return true;
1074 }
1075
1076 static int mess_parent_dgm_cleanup(void *private_data)
1077 {
1078         int ret;
1079
1080         ret = messaging_dgm_wipe();
1081         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1082                    ret ? strerror(ret) : "ok"));
1083         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1084                            60*15);
1085 }
1086
1087 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1088 {
1089         struct messaging_context *msg = tevent_req_callback_data(
1090                 req, struct messaging_context);
1091         NTSTATUS status;
1092
1093         status = background_job_recv(req);
1094         TALLOC_FREE(req);
1095         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1096                   nt_errstr(status)));
1097
1098         req = background_job_send(
1099                 msg, msg->event_ctx, msg, NULL, 0,
1100                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1101                             60*15),
1102                 mess_parent_dgm_cleanup, msg);
1103         if (req == NULL) {
1104                 DEBUG(1, ("background_job_send failed\n"));
1105                 return;
1106         }
1107         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1108 }
1109
1110 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1111 {
1112         int ret;
1113
1114         if (pid == 0) {
1115                 ret = messaging_dgm_wipe();
1116         } else {
1117                 ret = messaging_dgm_cleanup(pid);
1118         }
1119
1120         return ret;
1121 }
1122
1123 struct tevent_context *messaging_tevent_context(
1124         struct messaging_context *msg_ctx)
1125 {
1126         return msg_ctx->event_ctx;
1127 }
1128
1129 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1130 {
1131         return msg_ctx->names_db;
1132 }
1133
1134 /** @} **/