s3:lib/messages: add missing allocation check for priv_path
[bbaumbach/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32_t msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         struct server_id_buf idbuf;
217         struct messaging_rec rec;
218         int64_t fds64[MIN(num_fds, INT8_MAX)];
219         size_t i;
220
221         if (msg_len < MESSAGE_HDR_LENGTH) {
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 goto close_fail;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228                 goto close_fail;
229         }
230
231         /*
232          * "consume" the fds by copying them and setting
233          * the original variable to -1
234          */
235         for (i=0; i < num_fds; i++) {
236                 fds64[i] = fds[i];
237                 fds[i] = -1;
238         }
239
240         rec = (struct messaging_rec) {
241                 .msg_version = MESSAGE_VERSION,
242                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244                 .num_fds = num_fds,
245                 .fds = fds64,
246         };
247
248         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
249
250         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251                    __func__, (unsigned)rec.msg_type,
252                    (unsigned)rec.buf.length,
253                    (unsigned)num_fds,
254                    server_id_str_buf(rec.src, &idbuf)));
255
256         messaging_dispatch_rec(msg_ctx, &rec);
257         return;
258
259 close_fail:
260         for (i=0; i < num_fds; i++) {
261                 close(fds[i]);
262         }
263 }
264
265 static int messaging_context_destructor(struct messaging_context *ctx)
266 {
267         unsigned i;
268
269         for (i=0; i<ctx->num_new_waiters; i++) {
270                 if (ctx->new_waiters[i] != NULL) {
271                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272                         ctx->new_waiters[i] = NULL;
273                 }
274         }
275         for (i=0; i<ctx->num_waiters; i++) {
276                 if (ctx->waiters[i] != NULL) {
277                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278                         ctx->waiters[i] = NULL;
279                 }
280         }
281
282         return 0;
283 }
284
285 static const char *private_path(const char *name)
286 {
287         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
288 }
289
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
291                                          struct tevent_context *ev)
292 {
293         struct messaging_context *ctx;
294         NTSTATUS status;
295         int ret;
296         const char *lck_path;
297         const char *priv_path;
298         bool ok;
299
300         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
301                 return NULL;
302         }
303
304         ctx->id = procid_self();
305         ctx->event_ctx = ev;
306
307         sec_init();
308
309         lck_path = lock_path("msg");
310         if (lck_path == NULL) {
311                 TALLOC_FREE(ctx);
312                 return NULL;
313         }
314
315         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
316                                               0755);
317         if (!ok) {
318                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
319                            __func__, strerror(errno)));
320                 TALLOC_FREE(ctx);
321                 return NULL;
322         }
323
324         priv_path = private_path("sock");
325         if (priv_path == NULL) {
326                 TALLOC_FREE(ctx);
327                 return NULL;
328         }
329
330         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
331                                               0700);
332         if (!ok) {
333                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
334                            __func__, strerror(errno)));
335                 TALLOC_FREE(ctx);
336                 return NULL;
337         }
338
339         ctx->msg_dgm_ref = messaging_dgm_ref(
340                 ctx, ctx->event_ctx, ctx->id.unique_id,
341                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
342
343         if (ctx->msg_dgm_ref == NULL) {
344                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
345                 TALLOC_FREE(ctx);
346                 return NULL;
347         }
348
349         talloc_set_destructor(ctx, messaging_context_destructor);
350
351         if (lp_clustering()) {
352                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
353
354                 if (!NT_STATUS_IS_OK(status)) {
355                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
356                                   nt_errstr(status)));
357                         TALLOC_FREE(ctx);
358                         return NULL;
359                 }
360         }
361         ctx->id.vnn = get_my_vnn();
362
363         ctx->names_db = server_id_db_init(
364                 ctx, ctx->id, lp_lock_directory(), 0,
365                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
366         if (ctx->names_db == NULL) {
367                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
368                 TALLOC_FREE(ctx);
369                 return NULL;
370         }
371
372         messaging_register(ctx, NULL, MSG_PING, ping_message);
373
374         /* Register some debugging related messages */
375
376         register_msg_pool_usage(ctx);
377         register_dmalloc_msgs(ctx);
378         debug_register_msgs(ctx);
379
380         return ctx;
381 }
382
383 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
384 {
385         return msg_ctx->id;
386 }
387
388 /*
389  * re-init after a fork
390  */
391 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
392 {
393         NTSTATUS status;
394         int ret;
395
396         TALLOC_FREE(msg_ctx->msg_dgm_ref);
397
398         msg_ctx->id = procid_self();
399
400         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
401                 msg_ctx, msg_ctx->event_ctx, msg_ctx->id.unique_id,
402                 private_path("sock"), lock_path("msg"),
403                 messaging_recv_cb, msg_ctx, &ret);
404
405         if (msg_ctx->msg_dgm_ref == NULL) {
406                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
407                 return map_nt_error_from_unix(ret);
408         }
409
410         TALLOC_FREE(msg_ctx->remote);
411
412         if (lp_clustering()) {
413                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
414                                               &msg_ctx->remote);
415
416                 if (!NT_STATUS_IS_OK(status)) {
417                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
418                                   nt_errstr(status)));
419                         return status;
420                 }
421         }
422
423         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
424
425         return NT_STATUS_OK;
426 }
427
428
429 /*
430  * Register a dispatch function for a particular message type. Allow multiple
431  * registrants
432 */
433 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
434                             void *private_data,
435                             uint32_t msg_type,
436                             void (*fn)(struct messaging_context *msg,
437                                        void *private_data, 
438                                        uint32_t msg_type, 
439                                        struct server_id server_id,
440                                        DATA_BLOB *data))
441 {
442         struct messaging_callback *cb;
443
444         DEBUG(5, ("Registering messaging pointer for type %u - "
445                   "private_data=%p\n",
446                   (unsigned)msg_type, private_data));
447
448         /*
449          * Only one callback per type
450          */
451
452         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
453                 /* we allow a second registration of the same message
454                    type if it has a different private pointer. This is
455                    needed in, for example, the internal notify code,
456                    which creates a new notify context for each tree
457                    connect, and expects to receive messages to each of
458                    them. */
459                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
460                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
461                                   (unsigned)msg_type, private_data));
462                         cb->fn = fn;
463                         cb->private_data = private_data;
464                         return NT_STATUS_OK;
465                 }
466         }
467
468         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
469                 return NT_STATUS_NO_MEMORY;
470         }
471
472         cb->msg_type = msg_type;
473         cb->fn = fn;
474         cb->private_data = private_data;
475
476         DLIST_ADD(msg_ctx->callbacks, cb);
477         return NT_STATUS_OK;
478 }
479
480 /*
481   De-register the function for a particular message type.
482 */
483 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
484                           void *private_data)
485 {
486         struct messaging_callback *cb, *next;
487
488         for (cb = ctx->callbacks; cb; cb = next) {
489                 next = cb->next;
490                 if ((cb->msg_type == msg_type)
491                     && (cb->private_data == private_data)) {
492                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
493                                   (unsigned)msg_type, private_data));
494                         DLIST_REMOVE(ctx->callbacks, cb);
495                         TALLOC_FREE(cb);
496                 }
497         }
498 }
499
500 /*
501   Send a message to a particular server
502 */
503 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
504                         struct server_id server, uint32_t msg_type,
505                         const DATA_BLOB *data)
506 {
507         struct iovec iov;
508
509         iov.iov_base = data->data;
510         iov.iov_len = data->length;
511
512         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
513 }
514
515 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
516                             struct server_id server, uint32_t msg_type,
517                             const uint8_t *buf, size_t len)
518 {
519         DATA_BLOB blob = data_blob_const(buf, len);
520         return messaging_send(msg_ctx, server, msg_type, &blob);
521 }
522
523 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
524                                  struct server_id src, struct server_id dst,
525                                  uint32_t msg_type,
526                                  const struct iovec *iov, int iovlen,
527                                  const int *fds, size_t num_fds)
528 {
529         int ret;
530         uint8_t hdr[MESSAGE_HDR_LENGTH];
531         struct iovec iov2[iovlen+1];
532
533         if (server_id_is_disconnected(&dst)) {
534                 return NT_STATUS_INVALID_PARAMETER_MIX;
535         }
536
537         if (num_fds > INT8_MAX) {
538                 return NT_STATUS_INVALID_PARAMETER_MIX;
539         }
540
541         if (!procid_is_local(&dst)) {
542                 if (num_fds > 0) {
543                         return NT_STATUS_NOT_SUPPORTED;
544                 }
545
546                 ret = msg_ctx->remote->send_fn(src, dst,
547                                                msg_type, iov, iovlen,
548                                                NULL, 0,
549                                                msg_ctx->remote);
550                 if (ret != 0) {
551                         return map_nt_error_from_unix(ret);
552                 }
553                 return NT_STATUS_OK;
554         }
555
556         message_hdr_put(hdr, msg_type, src, dst);
557         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
558         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
559
560         become_root();
561         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
562         unbecome_root();
563
564         if (ret != 0) {
565                 return map_nt_error_from_unix(ret);
566         }
567         return NT_STATUS_OK;
568 }
569
570 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
571                             struct server_id dst, uint32_t msg_type,
572                             const struct iovec *iov, int iovlen,
573                             const int *fds, size_t num_fds)
574 {
575         return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
576                                        iov, iovlen, fds, num_fds);
577 }
578
579 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
580                                                struct messaging_rec *rec)
581 {
582         struct messaging_rec *result;
583         size_t fds_size = sizeof(int64_t) * rec->num_fds;
584
585         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
586                                       rec->buf.length + fds_size);
587         if (result == NULL) {
588                 return NULL;
589         }
590         *result = *rec;
591
592         /* Doesn't fail, see talloc_pooled_object */
593
594         result->buf.data = talloc_memdup(result, rec->buf.data,
595                                          rec->buf.length);
596
597         result->fds = NULL;
598         if (result->num_fds > 0) {
599                 result->fds = talloc_memdup(result, rec->fds, fds_size);
600         }
601
602         return result;
603 }
604
605 struct messaging_filtered_read_state {
606         struct tevent_context *ev;
607         struct messaging_context *msg_ctx;
608         void *tevent_handle;
609
610         bool (*filter)(struct messaging_rec *rec, void *private_data);
611         void *private_data;
612
613         struct messaging_rec *rec;
614 };
615
616 static void messaging_filtered_read_cleanup(struct tevent_req *req,
617                                             enum tevent_req_state req_state);
618
619 struct tevent_req *messaging_filtered_read_send(
620         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
621         struct messaging_context *msg_ctx,
622         bool (*filter)(struct messaging_rec *rec, void *private_data),
623         void *private_data)
624 {
625         struct tevent_req *req;
626         struct messaging_filtered_read_state *state;
627         size_t new_waiters_len;
628
629         req = tevent_req_create(mem_ctx, &state,
630                                 struct messaging_filtered_read_state);
631         if (req == NULL) {
632                 return NULL;
633         }
634         state->ev = ev;
635         state->msg_ctx = msg_ctx;
636         state->filter = filter;
637         state->private_data = private_data;
638
639         /*
640          * We have to defer the callback here, as we might be called from
641          * within a different tevent_context than state->ev
642          */
643         tevent_req_defer_callback(req, state->ev);
644
645         state->tevent_handle = messaging_dgm_register_tevent_context(
646                 state, ev);
647         if (tevent_req_nomem(state, req)) {
648                 return tevent_req_post(req, ev);
649         }
650
651         /*
652          * We add ourselves to the "new_waiters" array, not the "waiters"
653          * array. If we are called from within messaging_read_done,
654          * messaging_dispatch_rec will be in an active for-loop on
655          * "waiters". We must be careful not to mess with this array, because
656          * it could mean that a single event is being delivered twice.
657          */
658
659         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
660
661         if (new_waiters_len == msg_ctx->num_new_waiters) {
662                 struct tevent_req **tmp;
663
664                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
665                                      struct tevent_req *, new_waiters_len+1);
666                 if (tevent_req_nomem(tmp, req)) {
667                         return tevent_req_post(req, ev);
668                 }
669                 msg_ctx->new_waiters = tmp;
670         }
671
672         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
673         msg_ctx->num_new_waiters += 1;
674         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
675
676         return req;
677 }
678
679 static void messaging_filtered_read_cleanup(struct tevent_req *req,
680                                             enum tevent_req_state req_state)
681 {
682         struct messaging_filtered_read_state *state = tevent_req_data(
683                 req, struct messaging_filtered_read_state);
684         struct messaging_context *msg_ctx = state->msg_ctx;
685         unsigned i;
686
687         tevent_req_set_cleanup_fn(req, NULL);
688
689         TALLOC_FREE(state->tevent_handle);
690
691         /*
692          * Just set the [new_]waiters entry to NULL, be careful not to mess
693          * with the other "waiters" array contents. We are often called from
694          * within "messaging_dispatch_rec", which loops over
695          * "waiters". Messing with the "waiters" array will mess up that
696          * for-loop.
697          */
698
699         for (i=0; i<msg_ctx->num_waiters; i++) {
700                 if (msg_ctx->waiters[i] == req) {
701                         msg_ctx->waiters[i] = NULL;
702                         return;
703                 }
704         }
705
706         for (i=0; i<msg_ctx->num_new_waiters; i++) {
707                 if (msg_ctx->new_waiters[i] == req) {
708                         msg_ctx->new_waiters[i] = NULL;
709                         return;
710                 }
711         }
712 }
713
714 static void messaging_filtered_read_done(struct tevent_req *req,
715                                          struct messaging_rec *rec)
716 {
717         struct messaging_filtered_read_state *state = tevent_req_data(
718                 req, struct messaging_filtered_read_state);
719
720         state->rec = messaging_rec_dup(state, rec);
721         if (tevent_req_nomem(state->rec, req)) {
722                 return;
723         }
724         tevent_req_done(req);
725 }
726
727 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
728                                  struct messaging_rec **presult)
729 {
730         struct messaging_filtered_read_state *state = tevent_req_data(
731                 req, struct messaging_filtered_read_state);
732         int err;
733
734         if (tevent_req_is_unix_error(req, &err)) {
735                 tevent_req_received(req);
736                 return err;
737         }
738         *presult = talloc_move(mem_ctx, &state->rec);
739         return 0;
740 }
741
742 struct messaging_read_state {
743         uint32_t msg_type;
744         struct messaging_rec *rec;
745 };
746
747 static bool messaging_read_filter(struct messaging_rec *rec,
748                                   void *private_data);
749 static void messaging_read_done(struct tevent_req *subreq);
750
751 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
752                                        struct tevent_context *ev,
753                                        struct messaging_context *msg,
754                                        uint32_t msg_type)
755 {
756         struct tevent_req *req, *subreq;
757         struct messaging_read_state *state;
758
759         req = tevent_req_create(mem_ctx, &state,
760                                 struct messaging_read_state);
761         if (req == NULL) {
762                 return NULL;
763         }
764         state->msg_type = msg_type;
765
766         subreq = messaging_filtered_read_send(state, ev, msg,
767                                               messaging_read_filter, state);
768         if (tevent_req_nomem(subreq, req)) {
769                 return tevent_req_post(req, ev);
770         }
771         tevent_req_set_callback(subreq, messaging_read_done, req);
772         return req;
773 }
774
775 static bool messaging_read_filter(struct messaging_rec *rec,
776                                   void *private_data)
777 {
778         struct messaging_read_state *state = talloc_get_type_abort(
779                 private_data, struct messaging_read_state);
780
781         if (rec->num_fds != 0) {
782                 return false;
783         }
784
785         return rec->msg_type == state->msg_type;
786 }
787
788 static void messaging_read_done(struct tevent_req *subreq)
789 {
790         struct tevent_req *req = tevent_req_callback_data(
791                 subreq, struct tevent_req);
792         struct messaging_read_state *state = tevent_req_data(
793                 req, struct messaging_read_state);
794         int ret;
795
796         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
797         TALLOC_FREE(subreq);
798         if (tevent_req_error(req, ret)) {
799                 return;
800         }
801         tevent_req_done(req);
802 }
803
804 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
805                         struct messaging_rec **presult)
806 {
807         struct messaging_read_state *state = tevent_req_data(
808                 req, struct messaging_read_state);
809         int err;
810
811         if (tevent_req_is_unix_error(req, &err)) {
812                 return err;
813         }
814         if (presult != NULL) {
815                 *presult = talloc_move(mem_ctx, &state->rec);
816         }
817         return 0;
818 }
819
820 struct messaging_handler_state {
821         struct tevent_context *ev;
822         struct messaging_context *msg_ctx;
823         uint32_t msg_type;
824         bool (*handler)(struct messaging_context *msg_ctx,
825                         struct messaging_rec **rec, void *private_data);
826         void *private_data;
827 };
828
829 static void messaging_handler_got_msg(struct tevent_req *subreq);
830
831 struct tevent_req *messaging_handler_send(
832         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
833         struct messaging_context *msg_ctx, uint32_t msg_type,
834         bool (*handler)(struct messaging_context *msg_ctx,
835                         struct messaging_rec **rec, void *private_data),
836         void *private_data)
837 {
838         struct tevent_req *req, *subreq;
839         struct messaging_handler_state *state;
840
841         req = tevent_req_create(mem_ctx, &state,
842                                 struct messaging_handler_state);
843         if (req == NULL) {
844                 return NULL;
845         }
846         state->ev = ev;
847         state->msg_ctx = msg_ctx;
848         state->msg_type = msg_type;
849         state->handler = handler;
850         state->private_data = private_data;
851
852         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
853                                      state->msg_type);
854         if (tevent_req_nomem(subreq, req)) {
855                 return tevent_req_post(req, ev);
856         }
857         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
858         return req;
859 }
860
861 static void messaging_handler_got_msg(struct tevent_req *subreq)
862 {
863         struct tevent_req *req = tevent_req_callback_data(
864                 subreq, struct tevent_req);
865         struct messaging_handler_state *state = tevent_req_data(
866                 req, struct messaging_handler_state);
867         struct messaging_rec *rec;
868         int ret;
869         bool ok;
870
871         ret = messaging_read_recv(subreq, state, &rec);
872         TALLOC_FREE(subreq);
873         if (tevent_req_error(req, ret)) {
874                 return;
875         }
876
877         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
878                                      state->msg_type);
879         if (tevent_req_nomem(subreq, req)) {
880                 return;
881         }
882         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
883
884         ok = state->handler(state->msg_ctx, &rec, state->private_data);
885         TALLOC_FREE(rec);
886         if (ok) {
887                 /*
888                  * Next round
889                  */
890                 return;
891         }
892         TALLOC_FREE(subreq);
893         tevent_req_done(req);
894 }
895
896 int messaging_handler_recv(struct tevent_req *req)
897 {
898         return tevent_req_simple_recv_unix(req);
899 }
900
901 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
902 {
903         if (msg_ctx->num_new_waiters == 0) {
904                 return true;
905         }
906
907         if (talloc_array_length(msg_ctx->waiters) <
908             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
909                 struct tevent_req **tmp;
910                 tmp = talloc_realloc(
911                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
912                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
913                 if (tmp == NULL) {
914                         DEBUG(1, ("%s: talloc failed\n", __func__));
915                         return false;
916                 }
917                 msg_ctx->waiters = tmp;
918         }
919
920         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
921                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
922
923         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
924         msg_ctx->num_new_waiters = 0;
925
926         return true;
927 }
928
929 /*
930   Dispatch one messaging_rec
931 */
932 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
933                                    struct messaging_rec *rec)
934 {
935         struct messaging_callback *cb, *next;
936         unsigned i;
937         size_t j;
938
939         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
940                 next = cb->next;
941                 if (cb->msg_type != rec->msg_type) {
942                         continue;
943                 }
944
945                 /*
946                  * the old style callbacks don't support fd passing
947                  */
948                 for (j=0; j < rec->num_fds; j++) {
949                         int fd = rec->fds[j];
950                         close(fd);
951                 }
952                 rec->num_fds = 0;
953                 rec->fds = NULL;
954
955                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
956                        rec->src, &rec->buf);
957
958                 /*
959                  * we continue looking for matching messages after finding
960                  * one. This matters for subsystems like the internal notify
961                  * code which register more than one handler for the same
962                  * message type
963                  */
964         }
965
966         if (!messaging_append_new_waiters(msg_ctx)) {
967                 for (j=0; j < rec->num_fds; j++) {
968                         int fd = rec->fds[j];
969                         close(fd);
970                 }
971                 rec->num_fds = 0;
972                 rec->fds = NULL;
973                 return;
974         }
975
976         i = 0;
977         while (i < msg_ctx->num_waiters) {
978                 struct tevent_req *req;
979                 struct messaging_filtered_read_state *state;
980
981                 req = msg_ctx->waiters[i];
982                 if (req == NULL) {
983                         /*
984                          * This got cleaned up. In the meantime,
985                          * move everything down one. We need
986                          * to keep the order of waiters, as
987                          * other code may depend on this.
988                          */
989                         if (i < msg_ctx->num_waiters - 1) {
990                                 memmove(&msg_ctx->waiters[i],
991                                         &msg_ctx->waiters[i+1],
992                                         sizeof(struct tevent_req *) *
993                                             (msg_ctx->num_waiters - i - 1));
994                         }
995                         msg_ctx->num_waiters -= 1;
996                         continue;
997                 }
998
999                 state = tevent_req_data(
1000                         req, struct messaging_filtered_read_state);
1001                 if (state->filter(rec, state->private_data)) {
1002                         messaging_filtered_read_done(req, rec);
1003
1004                         /*
1005                          * Only the first one gets the fd-array
1006                          */
1007                         rec->num_fds = 0;
1008                         rec->fds = NULL;
1009                 }
1010
1011                 i += 1;
1012         }
1013
1014         /*
1015          * If the fd-array isn't used, just close it.
1016          */
1017         for (j=0; j < rec->num_fds; j++) {
1018                 int fd = rec->fds[j];
1019                 close(fd);
1020         }
1021         rec->num_fds = 0;
1022         rec->fds = NULL;
1023 }
1024
1025 static int mess_parent_dgm_cleanup(void *private_data);
1026 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1027
1028 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1029 {
1030         struct tevent_req *req;
1031
1032         req = background_job_send(
1033                 msg, msg->event_ctx, msg, NULL, 0,
1034                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1035                             60*15),
1036                 mess_parent_dgm_cleanup, msg);
1037         if (req == NULL) {
1038                 return false;
1039         }
1040         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1041         return true;
1042 }
1043
1044 static int mess_parent_dgm_cleanup(void *private_data)
1045 {
1046         int ret;
1047
1048         ret = messaging_dgm_wipe();
1049         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1050                    ret ? strerror(ret) : "ok"));
1051         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1052                            60*15);
1053 }
1054
1055 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1056 {
1057         struct messaging_context *msg = tevent_req_callback_data(
1058                 req, struct messaging_context);
1059         NTSTATUS status;
1060
1061         status = background_job_recv(req);
1062         TALLOC_FREE(req);
1063         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1064                   nt_errstr(status)));
1065
1066         req = background_job_send(
1067                 msg, msg->event_ctx, msg, NULL, 0,
1068                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1069                             60*15),
1070                 mess_parent_dgm_cleanup, msg);
1071         if (req == NULL) {
1072                 DEBUG(1, ("background_job_send failed\n"));
1073                 return;
1074         }
1075         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1076 }
1077
1078 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1079 {
1080         int ret;
1081
1082         if (pid == 0) {
1083                 ret = messaging_dgm_wipe();
1084         } else {
1085                 ret = messaging_dgm_cleanup(pid);
1086         }
1087
1088         return ret;
1089 }
1090
1091 struct tevent_context *messaging_tevent_context(
1092         struct messaging_context *msg_ctx)
1093 {
1094         return msg_ctx->event_ctx;
1095 }
1096
1097 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1098 {
1099         return msg_ctx->names_db;
1100 }
1101
1102 /** @} **/