lib: Only return "rec" on demand in messaging_filtered_read_recv
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
109                               int *fds, size_t num_fds,
110                               void *private_data)
111 {
112         struct messaging_context *msg_ctx = talloc_get_type_abort(
113                 private_data, struct messaging_context);
114         struct server_id_buf idbuf;
115         struct messaging_rec rec;
116         int64_t fds64[MIN(num_fds, INT8_MAX)];
117         size_t i;
118
119         if (msg_len < MESSAGE_HDR_LENGTH) {
120                 DBG_WARNING("message too short: %zu\n", msg_len);
121                 goto close_fail;
122         }
123
124         if (num_fds > INT8_MAX) {
125                 DBG_WARNING("too many fds: %zu\n", num_fds);
126                 goto close_fail;
127         }
128
129         /*
130          * "consume" the fds by copying them and setting
131          * the original variable to -1
132          */
133         for (i=0; i < num_fds; i++) {
134                 fds64[i] = fds[i];
135                 fds[i] = -1;
136         }
137
138         rec = (struct messaging_rec) {
139                 .msg_version = MESSAGE_VERSION,
140                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
141                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
142                 .num_fds = num_fds,
143                 .fds = fds64,
144         };
145
146         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
147
148         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
149                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
150                   server_id_str_buf(rec.src, &idbuf));
151
152         messaging_dispatch_rec(msg_ctx, &rec);
153         return;
154
155 close_fail:
156         for (i=0; i < num_fds; i++) {
157                 close(fds[i]);
158         }
159 }
160
161 static int messaging_context_destructor(struct messaging_context *ctx)
162 {
163         unsigned i;
164
165         for (i=0; i<ctx->num_new_waiters; i++) {
166                 if (ctx->new_waiters[i] != NULL) {
167                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
168                         ctx->new_waiters[i] = NULL;
169                 }
170         }
171         for (i=0; i<ctx->num_waiters; i++) {
172                 if (ctx->waiters[i] != NULL) {
173                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
174                         ctx->waiters[i] = NULL;
175                 }
176         }
177
178         return 0;
179 }
180
181 static const char *private_path(const char *name)
182 {
183         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
184 }
185
186 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
187                                          struct tevent_context *ev)
188 {
189         struct messaging_context *ctx;
190         int ret;
191         const char *lck_path;
192         const char *priv_path;
193         bool ok;
194
195         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
196                 return NULL;
197         }
198
199         ctx->id = (struct server_id) {
200                 .pid = getpid(), .vnn = NONCLUSTER_VNN
201         };
202
203         ctx->event_ctx = ev;
204
205         sec_init();
206
207         lck_path = lock_path("msg.lock");
208         if (lck_path == NULL) {
209                 TALLOC_FREE(ctx);
210                 return NULL;
211         }
212
213         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
214                                               0755);
215         if (!ok) {
216                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
217                            __func__, strerror(errno)));
218                 TALLOC_FREE(ctx);
219                 return NULL;
220         }
221
222         priv_path = private_path("msg.sock");
223         if (priv_path == NULL) {
224                 TALLOC_FREE(ctx);
225                 return NULL;
226         }
227
228         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
229                                               0700);
230         if (!ok) {
231                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
232                            __func__, strerror(errno)));
233                 TALLOC_FREE(ctx);
234                 return NULL;
235         }
236
237         ctx->msg_dgm_ref = messaging_dgm_ref(
238                 ctx, ctx->event_ctx, &ctx->id.unique_id,
239                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
240
241         if (ctx->msg_dgm_ref == NULL) {
242                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
243                 TALLOC_FREE(ctx);
244                 return NULL;
245         }
246
247         talloc_set_destructor(ctx, messaging_context_destructor);
248
249         if (lp_clustering()) {
250                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
251
252                 if (ret != 0) {
253                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
254                                   strerror(ret)));
255                         TALLOC_FREE(ctx);
256                         return NULL;
257                 }
258         }
259         ctx->id.vnn = get_my_vnn();
260
261         ctx->names_db = server_id_db_init(
262                 ctx, ctx->id, lp_lock_directory(), 0,
263                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
264         if (ctx->names_db == NULL) {
265                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
266                 TALLOC_FREE(ctx);
267                 return NULL;
268         }
269
270         messaging_register(ctx, NULL, MSG_PING, ping_message);
271
272         /* Register some debugging related messages */
273
274         register_msg_pool_usage(ctx);
275         register_dmalloc_msgs(ctx);
276         debug_register_msgs(ctx);
277
278         {
279                 struct server_id_buf tmp;
280                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
281         }
282
283         return ctx;
284 }
285
286 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
287 {
288         return msg_ctx->id;
289 }
290
291 /*
292  * re-init after a fork
293  */
294 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
295 {
296         int ret;
297         char *lck_path;
298
299         TALLOC_FREE(msg_ctx->msg_dgm_ref);
300
301         msg_ctx->id = (struct server_id) {
302                 .pid = getpid(), .vnn = msg_ctx->id.vnn
303         };
304
305         lck_path = lock_path("msg.lock");
306         if (lck_path == NULL) {
307                 return NT_STATUS_NO_MEMORY;
308         }
309
310         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
311                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
312                 private_path("msg.sock"), lck_path,
313                 messaging_recv_cb, msg_ctx, &ret);
314
315         if (msg_ctx->msg_dgm_ref == NULL) {
316                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
317                 return map_nt_error_from_unix(ret);
318         }
319
320         if (lp_clustering()) {
321                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
322                                              msg_ctx->remote);
323
324                 if (ret != 0) {
325                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
326                                   strerror(ret)));
327                         return map_nt_error_from_unix(ret);
328                 }
329         }
330
331         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
332
333         return NT_STATUS_OK;
334 }
335
336
337 /*
338  * Register a dispatch function for a particular message type. Allow multiple
339  * registrants
340 */
341 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
342                             void *private_data,
343                             uint32_t msg_type,
344                             void (*fn)(struct messaging_context *msg,
345                                        void *private_data, 
346                                        uint32_t msg_type, 
347                                        struct server_id server_id,
348                                        DATA_BLOB *data))
349 {
350         struct messaging_callback *cb;
351
352         DEBUG(5, ("Registering messaging pointer for type %u - "
353                   "private_data=%p\n",
354                   (unsigned)msg_type, private_data));
355
356         /*
357          * Only one callback per type
358          */
359
360         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
361                 /* we allow a second registration of the same message
362                    type if it has a different private pointer. This is
363                    needed in, for example, the internal notify code,
364                    which creates a new notify context for each tree
365                    connect, and expects to receive messages to each of
366                    them. */
367                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
368                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
369                                   (unsigned)msg_type, private_data));
370                         cb->fn = fn;
371                         cb->private_data = private_data;
372                         return NT_STATUS_OK;
373                 }
374         }
375
376         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
377                 return NT_STATUS_NO_MEMORY;
378         }
379
380         cb->msg_type = msg_type;
381         cb->fn = fn;
382         cb->private_data = private_data;
383
384         DLIST_ADD(msg_ctx->callbacks, cb);
385         return NT_STATUS_OK;
386 }
387
388 /*
389   De-register the function for a particular message type.
390 */
391 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
392                           void *private_data)
393 {
394         struct messaging_callback *cb, *next;
395
396         for (cb = ctx->callbacks; cb; cb = next) {
397                 next = cb->next;
398                 if ((cb->msg_type == msg_type)
399                     && (cb->private_data == private_data)) {
400                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
401                                   (unsigned)msg_type, private_data));
402                         DLIST_REMOVE(ctx->callbacks, cb);
403                         TALLOC_FREE(cb);
404                 }
405         }
406 }
407
408 /*
409   Send a message to a particular server
410 */
411 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
412                         struct server_id server, uint32_t msg_type,
413                         const DATA_BLOB *data)
414 {
415         struct iovec iov = {0};
416
417         if (data != NULL) {
418                 iov.iov_base = data->data;
419                 iov.iov_len = data->length;
420         };
421
422         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
423 }
424
425 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
426                             struct server_id server, uint32_t msg_type,
427                             const uint8_t *buf, size_t len)
428 {
429         DATA_BLOB blob = data_blob_const(buf, len);
430         return messaging_send(msg_ctx, server, msg_type, &blob);
431 }
432
433 int messaging_send_iov_from(struct messaging_context *msg_ctx,
434                             struct server_id src, struct server_id dst,
435                             uint32_t msg_type,
436                             const struct iovec *iov, int iovlen,
437                             const int *fds, size_t num_fds)
438 {
439         int ret;
440         uint8_t hdr[MESSAGE_HDR_LENGTH];
441         struct iovec iov2[iovlen+1];
442
443         if (server_id_is_disconnected(&dst)) {
444                 return EINVAL;
445         }
446
447         if (num_fds > INT8_MAX) {
448                 return EINVAL;
449         }
450
451         if (dst.vnn != msg_ctx->id.vnn) {
452                 if (num_fds > 0) {
453                         return ENOSYS;
454                 }
455
456                 ret = msg_ctx->remote->send_fn(src, dst,
457                                                msg_type, iov, iovlen,
458                                                NULL, 0,
459                                                msg_ctx->remote);
460                 return ret;
461         }
462
463         message_hdr_put(hdr, msg_type, src, dst);
464         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
465         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
466
467         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
468
469         if (ret == EACCES) {
470                 become_root();
471                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
472                                          fds, num_fds);
473                 unbecome_root();
474         }
475
476         return ret;
477 }
478
479 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
480                             struct server_id dst, uint32_t msg_type,
481                             const struct iovec *iov, int iovlen,
482                             const int *fds, size_t num_fds)
483 {
484         int ret;
485
486         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
487                                       iov, iovlen, fds, num_fds);
488         if (ret != 0) {
489                 return map_nt_error_from_unix(ret);
490         }
491         return NT_STATUS_OK;
492 }
493
494 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
495                                                struct messaging_rec *rec)
496 {
497         struct messaging_rec *result;
498         size_t fds_size = sizeof(int64_t) * rec->num_fds;
499
500         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
501                                       rec->buf.length + fds_size);
502         if (result == NULL) {
503                 return NULL;
504         }
505         *result = *rec;
506
507         /* Doesn't fail, see talloc_pooled_object */
508
509         result->buf.data = talloc_memdup(result, rec->buf.data,
510                                          rec->buf.length);
511
512         result->fds = NULL;
513         if (result->num_fds > 0) {
514                 result->fds = talloc_memdup(result, rec->fds, fds_size);
515         }
516
517         return result;
518 }
519
520 struct messaging_filtered_read_state {
521         struct tevent_context *ev;
522         struct messaging_context *msg_ctx;
523         void *tevent_handle;
524
525         bool (*filter)(struct messaging_rec *rec, void *private_data);
526         void *private_data;
527
528         struct messaging_rec *rec;
529 };
530
531 static void messaging_filtered_read_cleanup(struct tevent_req *req,
532                                             enum tevent_req_state req_state);
533
534 struct tevent_req *messaging_filtered_read_send(
535         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
536         struct messaging_context *msg_ctx,
537         bool (*filter)(struct messaging_rec *rec, void *private_data),
538         void *private_data)
539 {
540         struct tevent_req *req;
541         struct messaging_filtered_read_state *state;
542         size_t new_waiters_len;
543
544         req = tevent_req_create(mem_ctx, &state,
545                                 struct messaging_filtered_read_state);
546         if (req == NULL) {
547                 return NULL;
548         }
549         state->ev = ev;
550         state->msg_ctx = msg_ctx;
551         state->filter = filter;
552         state->private_data = private_data;
553
554         /*
555          * We have to defer the callback here, as we might be called from
556          * within a different tevent_context than state->ev
557          */
558         tevent_req_defer_callback(req, state->ev);
559
560         state->tevent_handle = messaging_dgm_register_tevent_context(
561                 state, ev);
562         if (tevent_req_nomem(state->tevent_handle, req)) {
563                 return tevent_req_post(req, ev);
564         }
565
566         /*
567          * We add ourselves to the "new_waiters" array, not the "waiters"
568          * array. If we are called from within messaging_read_done,
569          * messaging_dispatch_rec will be in an active for-loop on
570          * "waiters". We must be careful not to mess with this array, because
571          * it could mean that a single event is being delivered twice.
572          */
573
574         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
575
576         if (new_waiters_len == msg_ctx->num_new_waiters) {
577                 struct tevent_req **tmp;
578
579                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
580                                      struct tevent_req *, new_waiters_len+1);
581                 if (tevent_req_nomem(tmp, req)) {
582                         return tevent_req_post(req, ev);
583                 }
584                 msg_ctx->new_waiters = tmp;
585         }
586
587         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
588         msg_ctx->num_new_waiters += 1;
589         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
590
591         return req;
592 }
593
594 static void messaging_filtered_read_cleanup(struct tevent_req *req,
595                                             enum tevent_req_state req_state)
596 {
597         struct messaging_filtered_read_state *state = tevent_req_data(
598                 req, struct messaging_filtered_read_state);
599         struct messaging_context *msg_ctx = state->msg_ctx;
600         unsigned i;
601
602         tevent_req_set_cleanup_fn(req, NULL);
603
604         TALLOC_FREE(state->tevent_handle);
605
606         /*
607          * Just set the [new_]waiters entry to NULL, be careful not to mess
608          * with the other "waiters" array contents. We are often called from
609          * within "messaging_dispatch_rec", which loops over
610          * "waiters". Messing with the "waiters" array will mess up that
611          * for-loop.
612          */
613
614         for (i=0; i<msg_ctx->num_waiters; i++) {
615                 if (msg_ctx->waiters[i] == req) {
616                         msg_ctx->waiters[i] = NULL;
617                         return;
618                 }
619         }
620
621         for (i=0; i<msg_ctx->num_new_waiters; i++) {
622                 if (msg_ctx->new_waiters[i] == req) {
623                         msg_ctx->new_waiters[i] = NULL;
624                         return;
625                 }
626         }
627 }
628
629 static void messaging_filtered_read_done(struct tevent_req *req,
630                                          struct messaging_rec *rec)
631 {
632         struct messaging_filtered_read_state *state = tevent_req_data(
633                 req, struct messaging_filtered_read_state);
634
635         state->rec = messaging_rec_dup(state, rec);
636         if (tevent_req_nomem(state->rec, req)) {
637                 return;
638         }
639         tevent_req_done(req);
640 }
641
642 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
643                                  struct messaging_rec **presult)
644 {
645         struct messaging_filtered_read_state *state = tevent_req_data(
646                 req, struct messaging_filtered_read_state);
647         int err;
648
649         if (tevent_req_is_unix_error(req, &err)) {
650                 tevent_req_received(req);
651                 return err;
652         }
653         if (presult != NULL) {
654                 *presult = talloc_move(mem_ctx, &state->rec);
655         }
656         return 0;
657 }
658
659 struct messaging_read_state {
660         uint32_t msg_type;
661         struct messaging_rec *rec;
662 };
663
664 static bool messaging_read_filter(struct messaging_rec *rec,
665                                   void *private_data);
666 static void messaging_read_done(struct tevent_req *subreq);
667
668 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
669                                        struct tevent_context *ev,
670                                        struct messaging_context *msg,
671                                        uint32_t msg_type)
672 {
673         struct tevent_req *req, *subreq;
674         struct messaging_read_state *state;
675
676         req = tevent_req_create(mem_ctx, &state,
677                                 struct messaging_read_state);
678         if (req == NULL) {
679                 return NULL;
680         }
681         state->msg_type = msg_type;
682
683         subreq = messaging_filtered_read_send(state, ev, msg,
684                                               messaging_read_filter, state);
685         if (tevent_req_nomem(subreq, req)) {
686                 return tevent_req_post(req, ev);
687         }
688         tevent_req_set_callback(subreq, messaging_read_done, req);
689         return req;
690 }
691
692 static bool messaging_read_filter(struct messaging_rec *rec,
693                                   void *private_data)
694 {
695         struct messaging_read_state *state = talloc_get_type_abort(
696                 private_data, struct messaging_read_state);
697
698         if (rec->num_fds != 0) {
699                 return false;
700         }
701
702         return rec->msg_type == state->msg_type;
703 }
704
705 static void messaging_read_done(struct tevent_req *subreq)
706 {
707         struct tevent_req *req = tevent_req_callback_data(
708                 subreq, struct tevent_req);
709         struct messaging_read_state *state = tevent_req_data(
710                 req, struct messaging_read_state);
711         int ret;
712
713         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
714         TALLOC_FREE(subreq);
715         if (tevent_req_error(req, ret)) {
716                 return;
717         }
718         tevent_req_done(req);
719 }
720
721 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
722                         struct messaging_rec **presult)
723 {
724         struct messaging_read_state *state = tevent_req_data(
725                 req, struct messaging_read_state);
726         int err;
727
728         if (tevent_req_is_unix_error(req, &err)) {
729                 return err;
730         }
731         if (presult != NULL) {
732                 *presult = talloc_move(mem_ctx, &state->rec);
733         }
734         return 0;
735 }
736
737 struct messaging_handler_state {
738         struct tevent_context *ev;
739         struct messaging_context *msg_ctx;
740         uint32_t msg_type;
741         bool (*handler)(struct messaging_context *msg_ctx,
742                         struct messaging_rec **rec, void *private_data);
743         void *private_data;
744 };
745
746 static void messaging_handler_got_msg(struct tevent_req *subreq);
747
748 struct tevent_req *messaging_handler_send(
749         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
750         struct messaging_context *msg_ctx, uint32_t msg_type,
751         bool (*handler)(struct messaging_context *msg_ctx,
752                         struct messaging_rec **rec, void *private_data),
753         void *private_data)
754 {
755         struct tevent_req *req, *subreq;
756         struct messaging_handler_state *state;
757
758         req = tevent_req_create(mem_ctx, &state,
759                                 struct messaging_handler_state);
760         if (req == NULL) {
761                 return NULL;
762         }
763         state->ev = ev;
764         state->msg_ctx = msg_ctx;
765         state->msg_type = msg_type;
766         state->handler = handler;
767         state->private_data = private_data;
768
769         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
770                                      state->msg_type);
771         if (tevent_req_nomem(subreq, req)) {
772                 return tevent_req_post(req, ev);
773         }
774         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
775         return req;
776 }
777
778 static void messaging_handler_got_msg(struct tevent_req *subreq)
779 {
780         struct tevent_req *req = tevent_req_callback_data(
781                 subreq, struct tevent_req);
782         struct messaging_handler_state *state = tevent_req_data(
783                 req, struct messaging_handler_state);
784         struct messaging_rec *rec;
785         int ret;
786         bool ok;
787
788         ret = messaging_read_recv(subreq, state, &rec);
789         TALLOC_FREE(subreq);
790         if (tevent_req_error(req, ret)) {
791                 return;
792         }
793
794         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
795                                      state->msg_type);
796         if (tevent_req_nomem(subreq, req)) {
797                 return;
798         }
799         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
800
801         ok = state->handler(state->msg_ctx, &rec, state->private_data);
802         TALLOC_FREE(rec);
803         if (ok) {
804                 /*
805                  * Next round
806                  */
807                 return;
808         }
809         TALLOC_FREE(subreq);
810         tevent_req_done(req);
811 }
812
813 int messaging_handler_recv(struct tevent_req *req)
814 {
815         return tevent_req_simple_recv_unix(req);
816 }
817
818 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
819 {
820         if (msg_ctx->num_new_waiters == 0) {
821                 return true;
822         }
823
824         if (talloc_array_length(msg_ctx->waiters) <
825             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
826                 struct tevent_req **tmp;
827                 tmp = talloc_realloc(
828                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
829                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
830                 if (tmp == NULL) {
831                         DEBUG(1, ("%s: talloc failed\n", __func__));
832                         return false;
833                 }
834                 msg_ctx->waiters = tmp;
835         }
836
837         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
838                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
839
840         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
841         msg_ctx->num_new_waiters = 0;
842
843         return true;
844 }
845
846 /*
847   Dispatch one messaging_rec
848 */
849 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
850                                    struct messaging_rec *rec)
851 {
852         struct messaging_callback *cb, *next;
853         unsigned i;
854         size_t j;
855
856         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
857                 next = cb->next;
858                 if (cb->msg_type != rec->msg_type) {
859                         continue;
860                 }
861
862                 /*
863                  * the old style callbacks don't support fd passing
864                  */
865                 for (j=0; j < rec->num_fds; j++) {
866                         int fd = rec->fds[j];
867                         close(fd);
868                 }
869                 rec->num_fds = 0;
870                 rec->fds = NULL;
871
872                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
873                        rec->src, &rec->buf);
874
875                 /*
876                  * we continue looking for matching messages after finding
877                  * one. This matters for subsystems like the internal notify
878                  * code which register more than one handler for the same
879                  * message type
880                  */
881         }
882
883         if (!messaging_append_new_waiters(msg_ctx)) {
884                 for (j=0; j < rec->num_fds; j++) {
885                         int fd = rec->fds[j];
886                         close(fd);
887                 }
888                 rec->num_fds = 0;
889                 rec->fds = NULL;
890                 return;
891         }
892
893         i = 0;
894         while (i < msg_ctx->num_waiters) {
895                 struct tevent_req *req;
896                 struct messaging_filtered_read_state *state;
897
898                 req = msg_ctx->waiters[i];
899                 if (req == NULL) {
900                         /*
901                          * This got cleaned up. In the meantime,
902                          * move everything down one. We need
903                          * to keep the order of waiters, as
904                          * other code may depend on this.
905                          */
906                         if (i < msg_ctx->num_waiters - 1) {
907                                 memmove(&msg_ctx->waiters[i],
908                                         &msg_ctx->waiters[i+1],
909                                         sizeof(struct tevent_req *) *
910                                             (msg_ctx->num_waiters - i - 1));
911                         }
912                         msg_ctx->num_waiters -= 1;
913                         continue;
914                 }
915
916                 state = tevent_req_data(
917                         req, struct messaging_filtered_read_state);
918                 if (state->filter(rec, state->private_data)) {
919                         messaging_filtered_read_done(req, rec);
920
921                         /*
922                          * Only the first one gets the fd-array
923                          */
924                         rec->num_fds = 0;
925                         rec->fds = NULL;
926                 }
927
928                 i += 1;
929         }
930
931         /*
932          * If the fd-array isn't used, just close it.
933          */
934         for (j=0; j < rec->num_fds; j++) {
935                 int fd = rec->fds[j];
936                 close(fd);
937         }
938         rec->num_fds = 0;
939         rec->fds = NULL;
940 }
941
942 static int mess_parent_dgm_cleanup(void *private_data);
943 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
944
945 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
946 {
947         struct tevent_req *req;
948
949         req = background_job_send(
950                 msg, msg->event_ctx, msg, NULL, 0,
951                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
952                             60*15),
953                 mess_parent_dgm_cleanup, msg);
954         if (req == NULL) {
955                 return false;
956         }
957         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
958         return true;
959 }
960
961 static int mess_parent_dgm_cleanup(void *private_data)
962 {
963         int ret;
964
965         ret = messaging_dgm_wipe();
966         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
967                    ret ? strerror(ret) : "ok"));
968         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
969                            60*15);
970 }
971
972 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
973 {
974         struct messaging_context *msg = tevent_req_callback_data(
975                 req, struct messaging_context);
976         NTSTATUS status;
977
978         status = background_job_recv(req);
979         TALLOC_FREE(req);
980         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
981                   nt_errstr(status)));
982
983         req = background_job_send(
984                 msg, msg->event_ctx, msg, NULL, 0,
985                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
986                             60*15),
987                 mess_parent_dgm_cleanup, msg);
988         if (req == NULL) {
989                 DEBUG(1, ("background_job_send failed\n"));
990                 return;
991         }
992         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
993 }
994
995 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
996 {
997         int ret;
998
999         if (pid == 0) {
1000                 ret = messaging_dgm_wipe();
1001         } else {
1002                 ret = messaging_dgm_cleanup(pid);
1003         }
1004
1005         return ret;
1006 }
1007
1008 struct tevent_context *messaging_tevent_context(
1009         struct messaging_context *msg_ctx)
1010 {
1011         return msg_ctx->event_ctx;
1012 }
1013
1014 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1015 {
1016         return msg_ctx->names_db;
1017 }
1018
1019 /** @} **/