messaging: With dgm_ref, don't destroy the dgm ctx
[nivanova/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 /****************************************************************************
87  A useful function for testing the message system.
88 ****************************************************************************/
89
90 static void ping_message(struct messaging_context *msg_ctx,
91                          void *private_data,
92                          uint32_t msg_type,
93                          struct server_id src,
94                          DATA_BLOB *data)
95 {
96         struct server_id_buf idbuf;
97
98         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
99                   server_id_str_buf(src, &idbuf), (int)data->length,
100                   data->data ? (char *)data->data : ""));
101
102         messaging_send(msg_ctx, src, MSG_PONG, data);
103 }
104
105 /****************************************************************************
106  Register/replace a dispatch function for a particular message type.
107  JRA changed Dec 13 2006. Only one message handler now permitted per type.
108  *NOTE*: Dispatch functions must be able to cope with incoming
109  messages on an *odd* byte boundary.
110 ****************************************************************************/
111
112 struct msg_all {
113         struct messaging_context *msg_ctx;
114         int msg_type;
115         uint32_t msg_flag;
116         const void *buf;
117         size_t len;
118         int n_sent;
119 };
120
121 /****************************************************************************
122  Send one of the messages for the broadcast.
123 ****************************************************************************/
124
125 static int traverse_fn(struct db_record *rec, const struct server_id *id,
126                        uint32_t msg_flags, void *state)
127 {
128         struct msg_all *msg_all = (struct msg_all *)state;
129         NTSTATUS status;
130
131         /* Don't send if the receiver hasn't registered an interest. */
132
133         if((msg_flags & msg_all->msg_flag) == 0) {
134                 return 0;
135         }
136
137         /* If the msg send fails because the pid was not found (i.e. smbd died), 
138          * the msg has already been deleted from the messages.tdb.*/
139
140         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
141                                     (const uint8_t *)msg_all->buf, msg_all->len);
142
143         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
144                 struct server_id_buf idbuf;
145
146                 /*
147                  * If the pid was not found delete the entry from
148                  * serverid.tdb
149                  */
150
151                 DEBUG(2, ("pid %s doesn't exist\n",
152                           server_id_str_buf(*id, &idbuf)));
153
154                 dbwrap_record_delete(rec);
155         }
156         msg_all->n_sent++;
157         return 0;
158 }
159
160 /**
161  * Send a message to all smbd processes.
162  *
163  * It isn't very efficient, but should be OK for the sorts of
164  * applications that use it. When we need efficient broadcast we can add
165  * it.
166  *
167  * @param n_sent Set to the number of messages sent.  This should be
168  * equal to the number of processes, but be careful for races.
169  *
170  * @retval True for success.
171  **/
172 bool message_send_all(struct messaging_context *msg_ctx,
173                       int msg_type,
174                       const void *buf, size_t len,
175                       int *n_sent)
176 {
177         struct msg_all msg_all;
178
179         msg_all.msg_type = msg_type;
180         if (msg_type < 0x100) {
181                 msg_all.msg_flag = FLAG_MSG_GENERAL;
182         } else if (msg_type > 0x100 && msg_type < 0x200) {
183                 msg_all.msg_flag = FLAG_MSG_NMBD;
184         } else if (msg_type > 0x200 && msg_type < 0x300) {
185                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
186         } else if (msg_type > 0x300 && msg_type < 0x400) {
187                 msg_all.msg_flag = FLAG_MSG_SMBD;
188         } else if (msg_type > 0x400 && msg_type < 0x600) {
189                 msg_all.msg_flag = FLAG_MSG_WINBIND;
190         } else if (msg_type > 4000 && msg_type < 5000) {
191                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
192         } else {
193                 return false;
194         }
195
196         msg_all.buf = buf;
197         msg_all.len = len;
198         msg_all.n_sent = 0;
199         msg_all.msg_ctx = msg_ctx;
200
201         serverid_traverse(traverse_fn, &msg_all);
202         if (n_sent)
203                 *n_sent = msg_all.n_sent;
204         return true;
205 }
206
207 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
208                               int *fds, size_t num_fds,
209                               void *private_data)
210 {
211         struct messaging_context *msg_ctx = talloc_get_type_abort(
212                 private_data, struct messaging_context);
213         uint8_t hdr[MESSAGE_HDR_LENGTH];
214         struct server_id_buf idbuf;
215         struct messaging_rec rec;
216         int64_t fds64[MIN(num_fds, INT8_MAX)];
217         size_t i;
218
219         if (msg_len < sizeof(hdr)) {
220                 for (i=0; i < num_fds; i++) {
221                         close(fds[i]);
222                 }
223                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
224                 return;
225         }
226
227         if (num_fds > INT8_MAX) {
228                 for (i=0; i < num_fds; i++) {
229                         close(fds[i]);
230                 }
231                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
232                 return;
233         }
234
235         /*
236          * "consume" the fds by copying them and setting
237          * the original variable to -1
238          */
239         for (i=0; i < num_fds; i++) {
240                 fds64[i] = fds[i];
241                 fds[i] = -1;
242         }
243
244         rec = (struct messaging_rec) {
245                 .msg_version = MESSAGE_VERSION,
246                 .buf.data = discard_const_p(uint8_t, msg) + sizeof(hdr),
247                 .buf.length = msg_len - sizeof(hdr),
248                 .num_fds = num_fds,
249                 .fds = fds64,
250         };
251
252         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
253
254         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
255                    __func__, (unsigned)rec.msg_type,
256                    (unsigned)rec.buf.length,
257                    (unsigned)num_fds,
258                    server_id_str_buf(rec.src, &idbuf)));
259
260         messaging_dispatch_rec(msg_ctx, &rec);
261 }
262
263 static int messaging_context_destructor(struct messaging_context *ctx)
264 {
265         unsigned i;
266
267         for (i=0; i<ctx->num_new_waiters; i++) {
268                 if (ctx->new_waiters[i] != NULL) {
269                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
270                         ctx->new_waiters[i] = NULL;
271                 }
272         }
273         for (i=0; i<ctx->num_waiters; i++) {
274                 if (ctx->waiters[i] != NULL) {
275                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
276                         ctx->waiters[i] = NULL;
277                 }
278         }
279
280         return 0;
281 }
282
283 static const char *private_path(const char *name)
284 {
285         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
286 }
287
288 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
289                                          struct tevent_context *ev)
290 {
291         struct messaging_context *ctx;
292         NTSTATUS status;
293         int ret;
294         const char *lck_path;
295         const char *priv_path;
296         bool ok;
297
298         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
299                 return NULL;
300         }
301
302         ctx->id = procid_self();
303         ctx->event_ctx = ev;
304
305         sec_init();
306
307         lck_path = lock_path("msg");
308         if (lck_path == NULL) {
309                 TALLOC_FREE(ctx);
310                 return NULL;
311         }
312
313         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
314                                               0755);
315         if (!ok) {
316                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
317                            __func__, strerror(errno)));
318                 TALLOC_FREE(ctx);
319                 return NULL;
320         }
321
322         priv_path = private_path("sock");
323
324         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
325                                               0700);
326         if (!ok) {
327                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
328                            __func__, strerror(errno)));
329                 TALLOC_FREE(ctx);
330                 return NULL;
331         }
332
333         ctx->msg_dgm_ref = messaging_dgm_ref(
334                 ctx, ctx->event_ctx, ctx->id.unique_id,
335                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
336
337         if (ctx->msg_dgm_ref == NULL) {
338                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
339                 TALLOC_FREE(ctx);
340                 return NULL;
341         }
342
343         ctx->names_db = server_id_db_init(
344                 ctx, ctx->id, lp_lock_directory(), 0,
345                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
346         if (ctx->names_db == NULL) {
347                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
348                 TALLOC_FREE(ctx);
349                 return NULL;
350         }
351
352         talloc_set_destructor(ctx, messaging_context_destructor);
353
354         if (lp_clustering()) {
355                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
356
357                 if (!NT_STATUS_IS_OK(status)) {
358                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
359                                   nt_errstr(status)));
360                         TALLOC_FREE(ctx);
361                         return NULL;
362                 }
363         }
364         ctx->id.vnn = get_my_vnn();
365
366         messaging_register(ctx, NULL, MSG_PING, ping_message);
367
368         /* Register some debugging related messages */
369
370         register_msg_pool_usage(ctx);
371         register_dmalloc_msgs(ctx);
372         debug_register_msgs(ctx);
373
374         return ctx;
375 }
376
377 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
378 {
379         return msg_ctx->id;
380 }
381
382 /*
383  * re-init after a fork
384  */
385 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
386 {
387         NTSTATUS status;
388         int ret;
389
390         TALLOC_FREE(msg_ctx->msg_dgm_ref);
391
392         msg_ctx->id = procid_self();
393
394         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
395                 msg_ctx, msg_ctx->event_ctx, msg_ctx->id.unique_id,
396                 private_path("sock"), lock_path("msg"),
397                 messaging_recv_cb, msg_ctx, &ret);
398
399         if (msg_ctx->msg_dgm_ref == NULL) {
400                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
401                 return map_nt_error_from_unix(ret);
402         }
403
404         TALLOC_FREE(msg_ctx->remote);
405
406         if (lp_clustering()) {
407                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
408                                               &msg_ctx->remote);
409
410                 if (!NT_STATUS_IS_OK(status)) {
411                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
412                                   nt_errstr(status)));
413                         return status;
414                 }
415         }
416
417         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
418
419         return NT_STATUS_OK;
420 }
421
422
423 /*
424  * Register a dispatch function for a particular message type. Allow multiple
425  * registrants
426 */
427 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
428                             void *private_data,
429                             uint32_t msg_type,
430                             void (*fn)(struct messaging_context *msg,
431                                        void *private_data, 
432                                        uint32_t msg_type, 
433                                        struct server_id server_id,
434                                        DATA_BLOB *data))
435 {
436         struct messaging_callback *cb;
437
438         DEBUG(5, ("Registering messaging pointer for type %u - "
439                   "private_data=%p\n",
440                   (unsigned)msg_type, private_data));
441
442         /*
443          * Only one callback per type
444          */
445
446         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
447                 /* we allow a second registration of the same message
448                    type if it has a different private pointer. This is
449                    needed in, for example, the internal notify code,
450                    which creates a new notify context for each tree
451                    connect, and expects to receive messages to each of
452                    them. */
453                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
454                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
455                                   (unsigned)msg_type, private_data));
456                         cb->fn = fn;
457                         cb->private_data = private_data;
458                         return NT_STATUS_OK;
459                 }
460         }
461
462         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
463                 return NT_STATUS_NO_MEMORY;
464         }
465
466         cb->msg_type = msg_type;
467         cb->fn = fn;
468         cb->private_data = private_data;
469
470         DLIST_ADD(msg_ctx->callbacks, cb);
471         return NT_STATUS_OK;
472 }
473
474 /*
475   De-register the function for a particular message type.
476 */
477 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
478                           void *private_data)
479 {
480         struct messaging_callback *cb, *next;
481
482         for (cb = ctx->callbacks; cb; cb = next) {
483                 next = cb->next;
484                 if ((cb->msg_type == msg_type)
485                     && (cb->private_data == private_data)) {
486                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
487                                   (unsigned)msg_type, private_data));
488                         DLIST_REMOVE(ctx->callbacks, cb);
489                         TALLOC_FREE(cb);
490                 }
491         }
492 }
493
494 /*
495   Send a message to a particular server
496 */
497 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
498                         struct server_id server, uint32_t msg_type,
499                         const DATA_BLOB *data)
500 {
501         struct iovec iov;
502
503         iov.iov_base = data->data;
504         iov.iov_len = data->length;
505
506         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
507 }
508
509 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
510                             struct server_id server, uint32_t msg_type,
511                             const uint8_t *buf, size_t len)
512 {
513         DATA_BLOB blob = data_blob_const(buf, len);
514         return messaging_send(msg_ctx, server, msg_type, &blob);
515 }
516
517 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
518                                  struct server_id src, struct server_id dst,
519                                  uint32_t msg_type,
520                                  const struct iovec *iov, int iovlen,
521                                  const int *fds, size_t num_fds)
522 {
523         int ret;
524         uint8_t hdr[MESSAGE_HDR_LENGTH];
525         struct iovec iov2[iovlen+1];
526
527         if (server_id_is_disconnected(&dst)) {
528                 return NT_STATUS_INVALID_PARAMETER_MIX;
529         }
530
531         if (num_fds > INT8_MAX) {
532                 return NT_STATUS_INVALID_PARAMETER_MIX;
533         }
534
535         if (!procid_is_local(&dst)) {
536                 if (num_fds > 0) {
537                         return NT_STATUS_NOT_SUPPORTED;
538                 }
539
540                 ret = msg_ctx->remote->send_fn(src, dst,
541                                                msg_type, iov, iovlen,
542                                                NULL, 0,
543                                                msg_ctx->remote);
544                 if (ret != 0) {
545                         return map_nt_error_from_unix(ret);
546                 }
547                 return NT_STATUS_OK;
548         }
549
550         message_hdr_put(hdr, msg_type, src, dst);
551         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
552         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
553
554         become_root();
555         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
556         unbecome_root();
557
558         if (ret != 0) {
559                 return map_nt_error_from_unix(ret);
560         }
561         return NT_STATUS_OK;
562 }
563
564 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
565                             struct server_id dst, uint32_t msg_type,
566                             const struct iovec *iov, int iovlen,
567                             const int *fds, size_t num_fds)
568 {
569         return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
570                                        iov, iovlen, fds, num_fds);
571 }
572
573 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
574                                                struct messaging_rec *rec)
575 {
576         struct messaging_rec *result;
577         size_t fds_size = sizeof(int64_t) * rec->num_fds;
578
579         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
580                                       rec->buf.length + fds_size);
581         if (result == NULL) {
582                 return NULL;
583         }
584         *result = *rec;
585
586         /* Doesn't fail, see talloc_pooled_object */
587
588         result->buf.data = talloc_memdup(result, rec->buf.data,
589                                          rec->buf.length);
590
591         result->fds = NULL;
592         if (result->num_fds > 0) {
593                 result->fds = talloc_memdup(result, rec->fds, fds_size);
594         }
595
596         return result;
597 }
598
599 struct messaging_filtered_read_state {
600         struct tevent_context *ev;
601         struct messaging_context *msg_ctx;
602         void *tevent_handle;
603
604         bool (*filter)(struct messaging_rec *rec, void *private_data);
605         void *private_data;
606
607         struct messaging_rec *rec;
608 };
609
610 static void messaging_filtered_read_cleanup(struct tevent_req *req,
611                                             enum tevent_req_state req_state);
612
613 struct tevent_req *messaging_filtered_read_send(
614         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
615         struct messaging_context *msg_ctx,
616         bool (*filter)(struct messaging_rec *rec, void *private_data),
617         void *private_data)
618 {
619         struct tevent_req *req;
620         struct messaging_filtered_read_state *state;
621         size_t new_waiters_len;
622
623         req = tevent_req_create(mem_ctx, &state,
624                                 struct messaging_filtered_read_state);
625         if (req == NULL) {
626                 return NULL;
627         }
628         state->ev = ev;
629         state->msg_ctx = msg_ctx;
630         state->filter = filter;
631         state->private_data = private_data;
632
633         /*
634          * We have to defer the callback here, as we might be called from
635          * within a different tevent_context than state->ev
636          */
637         tevent_req_defer_callback(req, state->ev);
638
639         state->tevent_handle = messaging_dgm_register_tevent_context(
640                 state, ev);
641         if (tevent_req_nomem(state, req)) {
642                 return tevent_req_post(req, ev);
643         }
644
645         /*
646          * We add ourselves to the "new_waiters" array, not the "waiters"
647          * array. If we are called from within messaging_read_done,
648          * messaging_dispatch_rec will be in an active for-loop on
649          * "waiters". We must be careful not to mess with this array, because
650          * it could mean that a single event is being delivered twice.
651          */
652
653         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
654
655         if (new_waiters_len == msg_ctx->num_new_waiters) {
656                 struct tevent_req **tmp;
657
658                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
659                                      struct tevent_req *, new_waiters_len+1);
660                 if (tevent_req_nomem(tmp, req)) {
661                         return tevent_req_post(req, ev);
662                 }
663                 msg_ctx->new_waiters = tmp;
664         }
665
666         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
667         msg_ctx->num_new_waiters += 1;
668         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
669
670         return req;
671 }
672
673 static void messaging_filtered_read_cleanup(struct tevent_req *req,
674                                             enum tevent_req_state req_state)
675 {
676         struct messaging_filtered_read_state *state = tevent_req_data(
677                 req, struct messaging_filtered_read_state);
678         struct messaging_context *msg_ctx = state->msg_ctx;
679         unsigned i;
680
681         tevent_req_set_cleanup_fn(req, NULL);
682
683         TALLOC_FREE(state->tevent_handle);
684
685         /*
686          * Just set the [new_]waiters entry to NULL, be careful not to mess
687          * with the other "waiters" array contents. We are often called from
688          * within "messaging_dispatch_rec", which loops over
689          * "waiters". Messing with the "waiters" array will mess up that
690          * for-loop.
691          */
692
693         for (i=0; i<msg_ctx->num_waiters; i++) {
694                 if (msg_ctx->waiters[i] == req) {
695                         msg_ctx->waiters[i] = NULL;
696                         return;
697                 }
698         }
699
700         for (i=0; i<msg_ctx->num_new_waiters; i++) {
701                 if (msg_ctx->new_waiters[i] == req) {
702                         msg_ctx->new_waiters[i] = NULL;
703                         return;
704                 }
705         }
706 }
707
708 static void messaging_filtered_read_done(struct tevent_req *req,
709                                          struct messaging_rec *rec)
710 {
711         struct messaging_filtered_read_state *state = tevent_req_data(
712                 req, struct messaging_filtered_read_state);
713
714         state->rec = messaging_rec_dup(state, rec);
715         if (tevent_req_nomem(state->rec, req)) {
716                 return;
717         }
718         tevent_req_done(req);
719 }
720
721 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
722                                  struct messaging_rec **presult)
723 {
724         struct messaging_filtered_read_state *state = tevent_req_data(
725                 req, struct messaging_filtered_read_state);
726         int err;
727
728         if (tevent_req_is_unix_error(req, &err)) {
729                 tevent_req_received(req);
730                 return err;
731         }
732         *presult = talloc_move(mem_ctx, &state->rec);
733         return 0;
734 }
735
736 struct messaging_read_state {
737         uint32_t msg_type;
738         struct messaging_rec *rec;
739 };
740
741 static bool messaging_read_filter(struct messaging_rec *rec,
742                                   void *private_data);
743 static void messaging_read_done(struct tevent_req *subreq);
744
745 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
746                                        struct tevent_context *ev,
747                                        struct messaging_context *msg,
748                                        uint32_t msg_type)
749 {
750         struct tevent_req *req, *subreq;
751         struct messaging_read_state *state;
752
753         req = tevent_req_create(mem_ctx, &state,
754                                 struct messaging_read_state);
755         if (req == NULL) {
756                 return NULL;
757         }
758         state->msg_type = msg_type;
759
760         subreq = messaging_filtered_read_send(state, ev, msg,
761                                               messaging_read_filter, state);
762         if (tevent_req_nomem(subreq, req)) {
763                 return tevent_req_post(req, ev);
764         }
765         tevent_req_set_callback(subreq, messaging_read_done, req);
766         return req;
767 }
768
769 static bool messaging_read_filter(struct messaging_rec *rec,
770                                   void *private_data)
771 {
772         struct messaging_read_state *state = talloc_get_type_abort(
773                 private_data, struct messaging_read_state);
774
775         if (rec->num_fds != 0) {
776                 return false;
777         }
778
779         return rec->msg_type == state->msg_type;
780 }
781
782 static void messaging_read_done(struct tevent_req *subreq)
783 {
784         struct tevent_req *req = tevent_req_callback_data(
785                 subreq, struct tevent_req);
786         struct messaging_read_state *state = tevent_req_data(
787                 req, struct messaging_read_state);
788         int ret;
789
790         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
791         TALLOC_FREE(subreq);
792         if (tevent_req_error(req, ret)) {
793                 return;
794         }
795         tevent_req_done(req);
796 }
797
798 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
799                         struct messaging_rec **presult)
800 {
801         struct messaging_read_state *state = tevent_req_data(
802                 req, struct messaging_read_state);
803         int err;
804
805         if (tevent_req_is_unix_error(req, &err)) {
806                 return err;
807         }
808         if (presult != NULL) {
809                 *presult = talloc_move(mem_ctx, &state->rec);
810         }
811         return 0;
812 }
813
814 struct messaging_handler_state {
815         struct tevent_context *ev;
816         struct messaging_context *msg_ctx;
817         uint32_t msg_type;
818         bool (*handler)(struct messaging_context *msg_ctx,
819                         struct messaging_rec **rec, void *private_data);
820         void *private_data;
821 };
822
823 static void messaging_handler_got_msg(struct tevent_req *subreq);
824
825 struct tevent_req *messaging_handler_send(
826         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
827         struct messaging_context *msg_ctx, uint32_t msg_type,
828         bool (*handler)(struct messaging_context *msg_ctx,
829                         struct messaging_rec **rec, void *private_data),
830         void *private_data)
831 {
832         struct tevent_req *req, *subreq;
833         struct messaging_handler_state *state;
834
835         req = tevent_req_create(mem_ctx, &state,
836                                 struct messaging_handler_state);
837         if (req == NULL) {
838                 return NULL;
839         }
840         state->ev = ev;
841         state->msg_ctx = msg_ctx;
842         state->msg_type = msg_type;
843         state->handler = handler;
844         state->private_data = private_data;
845
846         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
847                                      state->msg_type);
848         if (tevent_req_nomem(subreq, req)) {
849                 return tevent_req_post(req, ev);
850         }
851         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
852         return req;
853 }
854
855 static void messaging_handler_got_msg(struct tevent_req *subreq)
856 {
857         struct tevent_req *req = tevent_req_callback_data(
858                 subreq, struct tevent_req);
859         struct messaging_handler_state *state = tevent_req_data(
860                 req, struct messaging_handler_state);
861         struct messaging_rec *rec;
862         int ret;
863         bool ok;
864
865         ret = messaging_read_recv(subreq, state, &rec);
866         TALLOC_FREE(subreq);
867         if (tevent_req_error(req, ret)) {
868                 return;
869         }
870
871         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
872                                      state->msg_type);
873         if (tevent_req_nomem(subreq, req)) {
874                 return;
875         }
876         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
877
878         ok = state->handler(state->msg_ctx, &rec, state->private_data);
879         TALLOC_FREE(rec);
880         if (ok) {
881                 /*
882                  * Next round
883                  */
884                 return;
885         }
886         TALLOC_FREE(subreq);
887         tevent_req_done(req);
888 }
889
890 int messaging_handler_recv(struct tevent_req *req)
891 {
892         return tevent_req_simple_recv_unix(req);
893 }
894
895 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
896 {
897         if (msg_ctx->num_new_waiters == 0) {
898                 return true;
899         }
900
901         if (talloc_array_length(msg_ctx->waiters) <
902             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
903                 struct tevent_req **tmp;
904                 tmp = talloc_realloc(
905                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
906                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
907                 if (tmp == NULL) {
908                         DEBUG(1, ("%s: talloc failed\n", __func__));
909                         return false;
910                 }
911                 msg_ctx->waiters = tmp;
912         }
913
914         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
915                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
916
917         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
918         msg_ctx->num_new_waiters = 0;
919
920         return true;
921 }
922
923 /*
924   Dispatch one messaging_rec
925 */
926 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
927                             struct messaging_rec *rec)
928 {
929         struct messaging_callback *cb, *next;
930         unsigned i;
931         size_t j;
932
933         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
934                 next = cb->next;
935                 if (cb->msg_type != rec->msg_type) {
936                         continue;
937                 }
938
939                 /*
940                  * the old style callbacks don't support fd passing
941                  */
942                 for (j=0; j < rec->num_fds; j++) {
943                         int fd = rec->fds[j];
944                         close(fd);
945                 }
946                 rec->num_fds = 0;
947                 rec->fds = NULL;
948
949                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
950                        rec->src, &rec->buf);
951
952                 /*
953                  * we continue looking for matching messages after finding
954                  * one. This matters for subsystems like the internal notify
955                  * code which register more than one handler for the same
956                  * message type
957                  */
958         }
959
960         if (!messaging_append_new_waiters(msg_ctx)) {
961                 for (j=0; j < rec->num_fds; j++) {
962                         int fd = rec->fds[j];
963                         close(fd);
964                 }
965                 rec->num_fds = 0;
966                 rec->fds = NULL;
967                 return;
968         }
969
970         i = 0;
971         while (i < msg_ctx->num_waiters) {
972                 struct tevent_req *req;
973                 struct messaging_filtered_read_state *state;
974
975                 req = msg_ctx->waiters[i];
976                 if (req == NULL) {
977                         /*
978                          * This got cleaned up. In the meantime,
979                          * move everything down one. We need
980                          * to keep the order of waiters, as
981                          * other code may depend on this.
982                          */
983                         if (i < msg_ctx->num_waiters - 1) {
984                                 memmove(&msg_ctx->waiters[i],
985                                         &msg_ctx->waiters[i+1],
986                                         sizeof(struct tevent_req *) *
987                                             (msg_ctx->num_waiters - i - 1));
988                         }
989                         msg_ctx->num_waiters -= 1;
990                         continue;
991                 }
992
993                 state = tevent_req_data(
994                         req, struct messaging_filtered_read_state);
995                 if (state->filter(rec, state->private_data)) {
996                         messaging_filtered_read_done(req, rec);
997
998                         /*
999                          * Only the first one gets the fd-array
1000                          */
1001                         rec->num_fds = 0;
1002                         rec->fds = NULL;
1003                 }
1004
1005                 i += 1;
1006         }
1007
1008         /*
1009          * If the fd-array isn't used, just close it.
1010          */
1011         for (j=0; j < rec->num_fds; j++) {
1012                 int fd = rec->fds[j];
1013                 close(fd);
1014         }
1015         rec->num_fds = 0;
1016         rec->fds = NULL;
1017 }
1018
1019 static int mess_parent_dgm_cleanup(void *private_data);
1020 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1021
1022 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1023 {
1024         struct tevent_req *req;
1025
1026         req = background_job_send(
1027                 msg, msg->event_ctx, msg, NULL, 0,
1028                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1029                             60*15),
1030                 mess_parent_dgm_cleanup, msg);
1031         if (req == NULL) {
1032                 return false;
1033         }
1034         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1035         return true;
1036 }
1037
1038 static int mess_parent_dgm_cleanup(void *private_data)
1039 {
1040         int ret;
1041
1042         ret = messaging_dgm_wipe();
1043         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1044                    ret ? strerror(ret) : "ok"));
1045         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1046                            60*15);
1047 }
1048
1049 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1050 {
1051         struct messaging_context *msg = tevent_req_callback_data(
1052                 req, struct messaging_context);
1053         NTSTATUS status;
1054
1055         status = background_job_recv(req);
1056         TALLOC_FREE(req);
1057         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1058                   nt_errstr(status)));
1059
1060         req = background_job_send(
1061                 msg, msg->event_ctx, msg, NULL, 0,
1062                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1063                             60*15),
1064                 mess_parent_dgm_cleanup, msg);
1065         if (req == NULL) {
1066                 DEBUG(1, ("background_job_send failed\n"));
1067                 return;
1068         }
1069         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1070 }
1071
1072 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1073 {
1074         int ret;
1075
1076         if (pid == 0) {
1077                 ret = messaging_dgm_wipe();
1078         } else {
1079                 ret = messaging_dgm_cleanup(pid);
1080         }
1081
1082         return ret;
1083 }
1084
1085 struct tevent_context *messaging_tevent_context(
1086         struct messaging_context *msg_ctx)
1087 {
1088         return msg_ctx->event_ctx;
1089 }
1090
1091 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1092 {
1093         return msg_ctx->names_db;
1094 }
1095
1096 /** @} **/