lib: Avoid a "procid_is_local" call
[bbaumbach/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32_t msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         struct server_id_buf idbuf;
217         struct messaging_rec rec;
218         int64_t fds64[MIN(num_fds, INT8_MAX)];
219         size_t i;
220
221         if (msg_len < MESSAGE_HDR_LENGTH) {
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 goto close_fail;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228                 goto close_fail;
229         }
230
231         /*
232          * "consume" the fds by copying them and setting
233          * the original variable to -1
234          */
235         for (i=0; i < num_fds; i++) {
236                 fds64[i] = fds[i];
237                 fds[i] = -1;
238         }
239
240         rec = (struct messaging_rec) {
241                 .msg_version = MESSAGE_VERSION,
242                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244                 .num_fds = num_fds,
245                 .fds = fds64,
246         };
247
248         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
249
250         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251                    __func__, (unsigned)rec.msg_type,
252                    (unsigned)rec.buf.length,
253                    (unsigned)num_fds,
254                    server_id_str_buf(rec.src, &idbuf)));
255
256         messaging_dispatch_rec(msg_ctx, &rec);
257         return;
258
259 close_fail:
260         for (i=0; i < num_fds; i++) {
261                 close(fds[i]);
262         }
263 }
264
265 static int messaging_context_destructor(struct messaging_context *ctx)
266 {
267         unsigned i;
268
269         for (i=0; i<ctx->num_new_waiters; i++) {
270                 if (ctx->new_waiters[i] != NULL) {
271                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272                         ctx->new_waiters[i] = NULL;
273                 }
274         }
275         for (i=0; i<ctx->num_waiters; i++) {
276                 if (ctx->waiters[i] != NULL) {
277                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278                         ctx->waiters[i] = NULL;
279                 }
280         }
281
282         return 0;
283 }
284
285 static const char *private_path(const char *name)
286 {
287         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
288 }
289
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
291                                          struct tevent_context *ev)
292 {
293         struct messaging_context *ctx;
294         int ret;
295         const char *lck_path;
296         const char *priv_path;
297         bool ok;
298
299         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
300                 return NULL;
301         }
302
303         ctx->id = (struct server_id) {
304                 .pid = getpid(), .vnn = NONCLUSTER_VNN
305         };
306
307         ctx->event_ctx = ev;
308
309         sec_init();
310
311         lck_path = lock_path("msg.lock");
312         if (lck_path == NULL) {
313                 TALLOC_FREE(ctx);
314                 return NULL;
315         }
316
317         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
318                                               0755);
319         if (!ok) {
320                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
321                            __func__, strerror(errno)));
322                 TALLOC_FREE(ctx);
323                 return NULL;
324         }
325
326         priv_path = private_path("msg.sock");
327         if (priv_path == NULL) {
328                 TALLOC_FREE(ctx);
329                 return NULL;
330         }
331
332         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
333                                               0700);
334         if (!ok) {
335                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
336                            __func__, strerror(errno)));
337                 TALLOC_FREE(ctx);
338                 return NULL;
339         }
340
341         ctx->msg_dgm_ref = messaging_dgm_ref(
342                 ctx, ctx->event_ctx, &ctx->id.unique_id,
343                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
344
345         if (ctx->msg_dgm_ref == NULL) {
346                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
347                 TALLOC_FREE(ctx);
348                 return NULL;
349         }
350
351         talloc_set_destructor(ctx, messaging_context_destructor);
352
353         if (lp_clustering()) {
354                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
355
356                 if (ret != 0) {
357                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
358                                   strerror(ret)));
359                         TALLOC_FREE(ctx);
360                         return NULL;
361                 }
362         }
363         ctx->id.vnn = get_my_vnn();
364
365         ctx->names_db = server_id_db_init(
366                 ctx, ctx->id, lp_lock_directory(), 0,
367                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
368         if (ctx->names_db == NULL) {
369                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
370                 TALLOC_FREE(ctx);
371                 return NULL;
372         }
373
374         messaging_register(ctx, NULL, MSG_PING, ping_message);
375
376         /* Register some debugging related messages */
377
378         register_msg_pool_usage(ctx);
379         register_dmalloc_msgs(ctx);
380         debug_register_msgs(ctx);
381
382         {
383                 struct server_id_buf tmp;
384                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
385         }
386
387         return ctx;
388 }
389
390 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
391 {
392         return msg_ctx->id;
393 }
394
395 /*
396  * re-init after a fork
397  */
398 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
399 {
400         int ret;
401         char *lck_path;
402
403         TALLOC_FREE(msg_ctx->msg_dgm_ref);
404
405         msg_ctx->id = (struct server_id) {
406                 .pid = getpid(), .vnn = msg_ctx->id.vnn
407         };
408
409         lck_path = lock_path("msg.lock");
410         if (lck_path == NULL) {
411                 return NT_STATUS_NO_MEMORY;
412         }
413
414         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
415                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
416                 private_path("msg.sock"), lck_path,
417                 messaging_recv_cb, msg_ctx, &ret);
418
419         if (msg_ctx->msg_dgm_ref == NULL) {
420                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
421                 return map_nt_error_from_unix(ret);
422         }
423
424         if (lp_clustering()) {
425                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
426                                              msg_ctx->remote);
427
428                 if (ret != 0) {
429                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
430                                   strerror(ret)));
431                         return map_nt_error_from_unix(ret);
432                 }
433         }
434
435         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
436
437         return NT_STATUS_OK;
438 }
439
440
441 /*
442  * Register a dispatch function for a particular message type. Allow multiple
443  * registrants
444 */
445 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
446                             void *private_data,
447                             uint32_t msg_type,
448                             void (*fn)(struct messaging_context *msg,
449                                        void *private_data, 
450                                        uint32_t msg_type, 
451                                        struct server_id server_id,
452                                        DATA_BLOB *data))
453 {
454         struct messaging_callback *cb;
455
456         DEBUG(5, ("Registering messaging pointer for type %u - "
457                   "private_data=%p\n",
458                   (unsigned)msg_type, private_data));
459
460         /*
461          * Only one callback per type
462          */
463
464         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
465                 /* we allow a second registration of the same message
466                    type if it has a different private pointer. This is
467                    needed in, for example, the internal notify code,
468                    which creates a new notify context for each tree
469                    connect, and expects to receive messages to each of
470                    them. */
471                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
472                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
473                                   (unsigned)msg_type, private_data));
474                         cb->fn = fn;
475                         cb->private_data = private_data;
476                         return NT_STATUS_OK;
477                 }
478         }
479
480         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
481                 return NT_STATUS_NO_MEMORY;
482         }
483
484         cb->msg_type = msg_type;
485         cb->fn = fn;
486         cb->private_data = private_data;
487
488         DLIST_ADD(msg_ctx->callbacks, cb);
489         return NT_STATUS_OK;
490 }
491
492 /*
493   De-register the function for a particular message type.
494 */
495 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
496                           void *private_data)
497 {
498         struct messaging_callback *cb, *next;
499
500         for (cb = ctx->callbacks; cb; cb = next) {
501                 next = cb->next;
502                 if ((cb->msg_type == msg_type)
503                     && (cb->private_data == private_data)) {
504                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
505                                   (unsigned)msg_type, private_data));
506                         DLIST_REMOVE(ctx->callbacks, cb);
507                         TALLOC_FREE(cb);
508                 }
509         }
510 }
511
512 /*
513   Send a message to a particular server
514 */
515 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
516                         struct server_id server, uint32_t msg_type,
517                         const DATA_BLOB *data)
518 {
519         struct iovec iov;
520
521         iov.iov_base = data->data;
522         iov.iov_len = data->length;
523
524         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
525 }
526
527 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
528                             struct server_id server, uint32_t msg_type,
529                             const uint8_t *buf, size_t len)
530 {
531         DATA_BLOB blob = data_blob_const(buf, len);
532         return messaging_send(msg_ctx, server, msg_type, &blob);
533 }
534
535 int messaging_send_iov_from(struct messaging_context *msg_ctx,
536                             struct server_id src, struct server_id dst,
537                             uint32_t msg_type,
538                             const struct iovec *iov, int iovlen,
539                             const int *fds, size_t num_fds)
540 {
541         int ret;
542         uint8_t hdr[MESSAGE_HDR_LENGTH];
543         struct iovec iov2[iovlen+1];
544
545         if (server_id_is_disconnected(&dst)) {
546                 return EINVAL;
547         }
548
549         if (num_fds > INT8_MAX) {
550                 return EINVAL;
551         }
552
553         if (dst.vnn != msg_ctx->id.vnn) {
554                 if (num_fds > 0) {
555                         return ENOSYS;
556                 }
557
558                 ret = msg_ctx->remote->send_fn(src, dst,
559                                                msg_type, iov, iovlen,
560                                                NULL, 0,
561                                                msg_ctx->remote);
562                 return ret;
563         }
564
565         message_hdr_put(hdr, msg_type, src, dst);
566         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
567         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
568
569         become_root();
570         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
571         unbecome_root();
572
573         return ret;
574 }
575
576 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
577                             struct server_id dst, uint32_t msg_type,
578                             const struct iovec *iov, int iovlen,
579                             const int *fds, size_t num_fds)
580 {
581         int ret;
582
583         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
584                                       iov, iovlen, fds, num_fds);
585         if (ret != 0) {
586                 return map_nt_error_from_unix(ret);
587         }
588         return NT_STATUS_OK;
589 }
590
591 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
592                                                struct messaging_rec *rec)
593 {
594         struct messaging_rec *result;
595         size_t fds_size = sizeof(int64_t) * rec->num_fds;
596
597         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
598                                       rec->buf.length + fds_size);
599         if (result == NULL) {
600                 return NULL;
601         }
602         *result = *rec;
603
604         /* Doesn't fail, see talloc_pooled_object */
605
606         result->buf.data = talloc_memdup(result, rec->buf.data,
607                                          rec->buf.length);
608
609         result->fds = NULL;
610         if (result->num_fds > 0) {
611                 result->fds = talloc_memdup(result, rec->fds, fds_size);
612         }
613
614         return result;
615 }
616
617 struct messaging_filtered_read_state {
618         struct tevent_context *ev;
619         struct messaging_context *msg_ctx;
620         void *tevent_handle;
621
622         bool (*filter)(struct messaging_rec *rec, void *private_data);
623         void *private_data;
624
625         struct messaging_rec *rec;
626 };
627
628 static void messaging_filtered_read_cleanup(struct tevent_req *req,
629                                             enum tevent_req_state req_state);
630
631 struct tevent_req *messaging_filtered_read_send(
632         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
633         struct messaging_context *msg_ctx,
634         bool (*filter)(struct messaging_rec *rec, void *private_data),
635         void *private_data)
636 {
637         struct tevent_req *req;
638         struct messaging_filtered_read_state *state;
639         size_t new_waiters_len;
640
641         req = tevent_req_create(mem_ctx, &state,
642                                 struct messaging_filtered_read_state);
643         if (req == NULL) {
644                 return NULL;
645         }
646         state->ev = ev;
647         state->msg_ctx = msg_ctx;
648         state->filter = filter;
649         state->private_data = private_data;
650
651         /*
652          * We have to defer the callback here, as we might be called from
653          * within a different tevent_context than state->ev
654          */
655         tevent_req_defer_callback(req, state->ev);
656
657         state->tevent_handle = messaging_dgm_register_tevent_context(
658                 state, ev);
659         if (tevent_req_nomem(state->tevent_handle, req)) {
660                 return tevent_req_post(req, ev);
661         }
662
663         /*
664          * We add ourselves to the "new_waiters" array, not the "waiters"
665          * array. If we are called from within messaging_read_done,
666          * messaging_dispatch_rec will be in an active for-loop on
667          * "waiters". We must be careful not to mess with this array, because
668          * it could mean that a single event is being delivered twice.
669          */
670
671         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
672
673         if (new_waiters_len == msg_ctx->num_new_waiters) {
674                 struct tevent_req **tmp;
675
676                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
677                                      struct tevent_req *, new_waiters_len+1);
678                 if (tevent_req_nomem(tmp, req)) {
679                         return tevent_req_post(req, ev);
680                 }
681                 msg_ctx->new_waiters = tmp;
682         }
683
684         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
685         msg_ctx->num_new_waiters += 1;
686         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
687
688         return req;
689 }
690
691 static void messaging_filtered_read_cleanup(struct tevent_req *req,
692                                             enum tevent_req_state req_state)
693 {
694         struct messaging_filtered_read_state *state = tevent_req_data(
695                 req, struct messaging_filtered_read_state);
696         struct messaging_context *msg_ctx = state->msg_ctx;
697         unsigned i;
698
699         tevent_req_set_cleanup_fn(req, NULL);
700
701         TALLOC_FREE(state->tevent_handle);
702
703         /*
704          * Just set the [new_]waiters entry to NULL, be careful not to mess
705          * with the other "waiters" array contents. We are often called from
706          * within "messaging_dispatch_rec", which loops over
707          * "waiters". Messing with the "waiters" array will mess up that
708          * for-loop.
709          */
710
711         for (i=0; i<msg_ctx->num_waiters; i++) {
712                 if (msg_ctx->waiters[i] == req) {
713                         msg_ctx->waiters[i] = NULL;
714                         return;
715                 }
716         }
717
718         for (i=0; i<msg_ctx->num_new_waiters; i++) {
719                 if (msg_ctx->new_waiters[i] == req) {
720                         msg_ctx->new_waiters[i] = NULL;
721                         return;
722                 }
723         }
724 }
725
726 static void messaging_filtered_read_done(struct tevent_req *req,
727                                          struct messaging_rec *rec)
728 {
729         struct messaging_filtered_read_state *state = tevent_req_data(
730                 req, struct messaging_filtered_read_state);
731
732         state->rec = messaging_rec_dup(state, rec);
733         if (tevent_req_nomem(state->rec, req)) {
734                 return;
735         }
736         tevent_req_done(req);
737 }
738
739 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
740                                  struct messaging_rec **presult)
741 {
742         struct messaging_filtered_read_state *state = tevent_req_data(
743                 req, struct messaging_filtered_read_state);
744         int err;
745
746         if (tevent_req_is_unix_error(req, &err)) {
747                 tevent_req_received(req);
748                 return err;
749         }
750         *presult = talloc_move(mem_ctx, &state->rec);
751         return 0;
752 }
753
754 struct messaging_read_state {
755         uint32_t msg_type;
756         struct messaging_rec *rec;
757 };
758
759 static bool messaging_read_filter(struct messaging_rec *rec,
760                                   void *private_data);
761 static void messaging_read_done(struct tevent_req *subreq);
762
763 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
764                                        struct tevent_context *ev,
765                                        struct messaging_context *msg,
766                                        uint32_t msg_type)
767 {
768         struct tevent_req *req, *subreq;
769         struct messaging_read_state *state;
770
771         req = tevent_req_create(mem_ctx, &state,
772                                 struct messaging_read_state);
773         if (req == NULL) {
774                 return NULL;
775         }
776         state->msg_type = msg_type;
777
778         subreq = messaging_filtered_read_send(state, ev, msg,
779                                               messaging_read_filter, state);
780         if (tevent_req_nomem(subreq, req)) {
781                 return tevent_req_post(req, ev);
782         }
783         tevent_req_set_callback(subreq, messaging_read_done, req);
784         return req;
785 }
786
787 static bool messaging_read_filter(struct messaging_rec *rec,
788                                   void *private_data)
789 {
790         struct messaging_read_state *state = talloc_get_type_abort(
791                 private_data, struct messaging_read_state);
792
793         if (rec->num_fds != 0) {
794                 return false;
795         }
796
797         return rec->msg_type == state->msg_type;
798 }
799
800 static void messaging_read_done(struct tevent_req *subreq)
801 {
802         struct tevent_req *req = tevent_req_callback_data(
803                 subreq, struct tevent_req);
804         struct messaging_read_state *state = tevent_req_data(
805                 req, struct messaging_read_state);
806         int ret;
807
808         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
809         TALLOC_FREE(subreq);
810         if (tevent_req_error(req, ret)) {
811                 return;
812         }
813         tevent_req_done(req);
814 }
815
816 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
817                         struct messaging_rec **presult)
818 {
819         struct messaging_read_state *state = tevent_req_data(
820                 req, struct messaging_read_state);
821         int err;
822
823         if (tevent_req_is_unix_error(req, &err)) {
824                 return err;
825         }
826         if (presult != NULL) {
827                 *presult = talloc_move(mem_ctx, &state->rec);
828         }
829         return 0;
830 }
831
832 struct messaging_handler_state {
833         struct tevent_context *ev;
834         struct messaging_context *msg_ctx;
835         uint32_t msg_type;
836         bool (*handler)(struct messaging_context *msg_ctx,
837                         struct messaging_rec **rec, void *private_data);
838         void *private_data;
839 };
840
841 static void messaging_handler_got_msg(struct tevent_req *subreq);
842
843 struct tevent_req *messaging_handler_send(
844         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
845         struct messaging_context *msg_ctx, uint32_t msg_type,
846         bool (*handler)(struct messaging_context *msg_ctx,
847                         struct messaging_rec **rec, void *private_data),
848         void *private_data)
849 {
850         struct tevent_req *req, *subreq;
851         struct messaging_handler_state *state;
852
853         req = tevent_req_create(mem_ctx, &state,
854                                 struct messaging_handler_state);
855         if (req == NULL) {
856                 return NULL;
857         }
858         state->ev = ev;
859         state->msg_ctx = msg_ctx;
860         state->msg_type = msg_type;
861         state->handler = handler;
862         state->private_data = private_data;
863
864         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
865                                      state->msg_type);
866         if (tevent_req_nomem(subreq, req)) {
867                 return tevent_req_post(req, ev);
868         }
869         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
870         return req;
871 }
872
873 static void messaging_handler_got_msg(struct tevent_req *subreq)
874 {
875         struct tevent_req *req = tevent_req_callback_data(
876                 subreq, struct tevent_req);
877         struct messaging_handler_state *state = tevent_req_data(
878                 req, struct messaging_handler_state);
879         struct messaging_rec *rec;
880         int ret;
881         bool ok;
882
883         ret = messaging_read_recv(subreq, state, &rec);
884         TALLOC_FREE(subreq);
885         if (tevent_req_error(req, ret)) {
886                 return;
887         }
888
889         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
890                                      state->msg_type);
891         if (tevent_req_nomem(subreq, req)) {
892                 return;
893         }
894         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
895
896         ok = state->handler(state->msg_ctx, &rec, state->private_data);
897         TALLOC_FREE(rec);
898         if (ok) {
899                 /*
900                  * Next round
901                  */
902                 return;
903         }
904         TALLOC_FREE(subreq);
905         tevent_req_done(req);
906 }
907
908 int messaging_handler_recv(struct tevent_req *req)
909 {
910         return tevent_req_simple_recv_unix(req);
911 }
912
913 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
914 {
915         if (msg_ctx->num_new_waiters == 0) {
916                 return true;
917         }
918
919         if (talloc_array_length(msg_ctx->waiters) <
920             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
921                 struct tevent_req **tmp;
922                 tmp = talloc_realloc(
923                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
924                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
925                 if (tmp == NULL) {
926                         DEBUG(1, ("%s: talloc failed\n", __func__));
927                         return false;
928                 }
929                 msg_ctx->waiters = tmp;
930         }
931
932         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
933                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
934
935         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
936         msg_ctx->num_new_waiters = 0;
937
938         return true;
939 }
940
941 /*
942   Dispatch one messaging_rec
943 */
944 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
945                                    struct messaging_rec *rec)
946 {
947         struct messaging_callback *cb, *next;
948         unsigned i;
949         size_t j;
950
951         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
952                 next = cb->next;
953                 if (cb->msg_type != rec->msg_type) {
954                         continue;
955                 }
956
957                 /*
958                  * the old style callbacks don't support fd passing
959                  */
960                 for (j=0; j < rec->num_fds; j++) {
961                         int fd = rec->fds[j];
962                         close(fd);
963                 }
964                 rec->num_fds = 0;
965                 rec->fds = NULL;
966
967                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
968                        rec->src, &rec->buf);
969
970                 /*
971                  * we continue looking for matching messages after finding
972                  * one. This matters for subsystems like the internal notify
973                  * code which register more than one handler for the same
974                  * message type
975                  */
976         }
977
978         if (!messaging_append_new_waiters(msg_ctx)) {
979                 for (j=0; j < rec->num_fds; j++) {
980                         int fd = rec->fds[j];
981                         close(fd);
982                 }
983                 rec->num_fds = 0;
984                 rec->fds = NULL;
985                 return;
986         }
987
988         i = 0;
989         while (i < msg_ctx->num_waiters) {
990                 struct tevent_req *req;
991                 struct messaging_filtered_read_state *state;
992
993                 req = msg_ctx->waiters[i];
994                 if (req == NULL) {
995                         /*
996                          * This got cleaned up. In the meantime,
997                          * move everything down one. We need
998                          * to keep the order of waiters, as
999                          * other code may depend on this.
1000                          */
1001                         if (i < msg_ctx->num_waiters - 1) {
1002                                 memmove(&msg_ctx->waiters[i],
1003                                         &msg_ctx->waiters[i+1],
1004                                         sizeof(struct tevent_req *) *
1005                                             (msg_ctx->num_waiters - i - 1));
1006                         }
1007                         msg_ctx->num_waiters -= 1;
1008                         continue;
1009                 }
1010
1011                 state = tevent_req_data(
1012                         req, struct messaging_filtered_read_state);
1013                 if (state->filter(rec, state->private_data)) {
1014                         messaging_filtered_read_done(req, rec);
1015
1016                         /*
1017                          * Only the first one gets the fd-array
1018                          */
1019                         rec->num_fds = 0;
1020                         rec->fds = NULL;
1021                 }
1022
1023                 i += 1;
1024         }
1025
1026         /*
1027          * If the fd-array isn't used, just close it.
1028          */
1029         for (j=0; j < rec->num_fds; j++) {
1030                 int fd = rec->fds[j];
1031                 close(fd);
1032         }
1033         rec->num_fds = 0;
1034         rec->fds = NULL;
1035 }
1036
1037 static int mess_parent_dgm_cleanup(void *private_data);
1038 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1039
1040 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1041 {
1042         struct tevent_req *req;
1043
1044         req = background_job_send(
1045                 msg, msg->event_ctx, msg, NULL, 0,
1046                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1047                             60*15),
1048                 mess_parent_dgm_cleanup, msg);
1049         if (req == NULL) {
1050                 return false;
1051         }
1052         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1053         return true;
1054 }
1055
1056 static int mess_parent_dgm_cleanup(void *private_data)
1057 {
1058         int ret;
1059
1060         ret = messaging_dgm_wipe();
1061         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1062                    ret ? strerror(ret) : "ok"));
1063         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1064                            60*15);
1065 }
1066
1067 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1068 {
1069         struct messaging_context *msg = tevent_req_callback_data(
1070                 req, struct messaging_context);
1071         NTSTATUS status;
1072
1073         status = background_job_recv(req);
1074         TALLOC_FREE(req);
1075         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1076                   nt_errstr(status)));
1077
1078         req = background_job_send(
1079                 msg, msg->event_ctx, msg, NULL, 0,
1080                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1081                             60*15),
1082                 mess_parent_dgm_cleanup, msg);
1083         if (req == NULL) {
1084                 DEBUG(1, ("background_job_send failed\n"));
1085                 return;
1086         }
1087         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1088 }
1089
1090 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1091 {
1092         int ret;
1093
1094         if (pid == 0) {
1095                 ret = messaging_dgm_wipe();
1096         } else {
1097                 ret = messaging_dgm_cleanup(pid);
1098         }
1099
1100         return ret;
1101 }
1102
1103 struct tevent_context *messaging_tevent_context(
1104         struct messaging_context *msg_ctx)
1105 {
1106         return msg_ctx->event_ctx;
1107 }
1108
1109 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1110 {
1111         return msg_ctx->names_db;
1112 }
1113
1114 /** @} **/