s3:messaging: add fds-array to messaging_send_iov()
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55
56 struct messaging_callback {
57         struct messaging_callback *prev, *next;
58         uint32 msg_type;
59         void (*fn)(struct messaging_context *msg, void *private_data, 
60                    uint32_t msg_type, 
61                    struct server_id server_id, DATA_BLOB *data);
62         void *private_data;
63 };
64
65 struct messaging_context {
66         struct server_id id;
67         struct tevent_context *event_ctx;
68         struct messaging_callback *callbacks;
69
70         struct tevent_req **new_waiters;
71         unsigned num_new_waiters;
72
73         struct tevent_req **waiters;
74         unsigned num_waiters;
75
76         struct messaging_backend *remote;
77 };
78
79 struct messaging_hdr {
80         int msg_type;
81         struct server_id dst;
82         struct server_id src;
83 };
84
85 /****************************************************************************
86  A useful function for testing the message system.
87 ****************************************************************************/
88
89 static void ping_message(struct messaging_context *msg_ctx,
90                          void *private_data,
91                          uint32_t msg_type,
92                          struct server_id src,
93                          DATA_BLOB *data)
94 {
95         struct server_id_buf idbuf;
96
97         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
98                   server_id_str_buf(src, &idbuf), (int)data->length,
99                   data->data ? (char *)data->data : ""));
100
101         messaging_send(msg_ctx, src, MSG_PONG, data);
102 }
103
104 /****************************************************************************
105  Register/replace a dispatch function for a particular message type.
106  JRA changed Dec 13 2006. Only one message handler now permitted per type.
107  *NOTE*: Dispatch functions must be able to cope with incoming
108  messages on an *odd* byte boundary.
109 ****************************************************************************/
110
111 struct msg_all {
112         struct messaging_context *msg_ctx;
113         int msg_type;
114         uint32 msg_flag;
115         const void *buf;
116         size_t len;
117         int n_sent;
118 };
119
120 /****************************************************************************
121  Send one of the messages for the broadcast.
122 ****************************************************************************/
123
124 static int traverse_fn(struct db_record *rec, const struct server_id *id,
125                        uint32_t msg_flags, void *state)
126 {
127         struct msg_all *msg_all = (struct msg_all *)state;
128         NTSTATUS status;
129
130         /* Don't send if the receiver hasn't registered an interest. */
131
132         if((msg_flags & msg_all->msg_flag) == 0) {
133                 return 0;
134         }
135
136         /* If the msg send fails because the pid was not found (i.e. smbd died), 
137          * the msg has already been deleted from the messages.tdb.*/
138
139         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
140                                     (const uint8_t *)msg_all->buf, msg_all->len);
141
142         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
143                 struct server_id_buf idbuf;
144
145                 /*
146                  * If the pid was not found delete the entry from
147                  * serverid.tdb
148                  */
149
150                 DEBUG(2, ("pid %s doesn't exist\n",
151                           server_id_str_buf(*id, &idbuf)));
152
153                 dbwrap_record_delete(rec);
154         }
155         msg_all->n_sent++;
156         return 0;
157 }
158
159 /**
160  * Send a message to all smbd processes.
161  *
162  * It isn't very efficient, but should be OK for the sorts of
163  * applications that use it. When we need efficient broadcast we can add
164  * it.
165  *
166  * @param n_sent Set to the number of messages sent.  This should be
167  * equal to the number of processes, but be careful for races.
168  *
169  * @retval True for success.
170  **/
171 bool message_send_all(struct messaging_context *msg_ctx,
172                       int msg_type,
173                       const void *buf, size_t len,
174                       int *n_sent)
175 {
176         struct msg_all msg_all;
177
178         msg_all.msg_type = msg_type;
179         if (msg_type < 0x100) {
180                 msg_all.msg_flag = FLAG_MSG_GENERAL;
181         } else if (msg_type > 0x100 && msg_type < 0x200) {
182                 msg_all.msg_flag = FLAG_MSG_NMBD;
183         } else if (msg_type > 0x200 && msg_type < 0x300) {
184                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
185         } else if (msg_type > 0x300 && msg_type < 0x400) {
186                 msg_all.msg_flag = FLAG_MSG_SMBD;
187         } else if (msg_type > 0x400 && msg_type < 0x600) {
188                 msg_all.msg_flag = FLAG_MSG_WINBIND;
189         } else if (msg_type > 4000 && msg_type < 5000) {
190                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
191         } else {
192                 return false;
193         }
194
195         msg_all.buf = buf;
196         msg_all.len = len;
197         msg_all.n_sent = 0;
198         msg_all.msg_ctx = msg_ctx;
199
200         serverid_traverse(traverse_fn, &msg_all);
201         if (n_sent)
202                 *n_sent = msg_all.n_sent;
203         return true;
204 }
205
206 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
207                               const int *fds, size_t num_fds,
208                               void *private_data)
209 {
210         struct messaging_context *msg_ctx = talloc_get_type_abort(
211                 private_data, struct messaging_context);
212         const struct messaging_hdr *hdr;
213         struct server_id_buf idbuf;
214         struct messaging_rec rec;
215         int64_t fds64[MIN(num_fds, INT8_MAX)];
216         size_t i;
217
218         if (msg_len < sizeof(*hdr)) {
219                 for (i=0; i < num_fds; i++) {
220                         close(fds[i]);
221                 }
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 return;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 for (i=0; i < num_fds; i++) {
228                         close(fds[i]);
229                 }
230                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
231                 return;
232         }
233
234         for (i=0; i < num_fds; i++) {
235                 fds64[i] = fds[i];
236         }
237
238         /*
239          * messages_dgm guarantees alignment, so we can cast here
240          */
241         hdr = (const struct messaging_hdr *)msg;
242
243         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
244                    __func__, (unsigned)hdr->msg_type,
245                    (unsigned)(msg_len - sizeof(*hdr)),
246                    (unsigned)num_fds,
247                    server_id_str_buf(hdr->src, &idbuf)));
248
249         rec = (struct messaging_rec) {
250                 .msg_version = MESSAGE_VERSION,
251                 .msg_type = hdr->msg_type,
252                 .src = hdr->src,
253                 .dest = hdr->dst,
254                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
255                 .buf.length = msg_len - sizeof(*hdr),
256                 .num_fds = num_fds,
257                 .fds = fds64,
258         };
259
260         messaging_dispatch_rec(msg_ctx, &rec);
261 }
262
263 static int messaging_context_destructor(struct messaging_context *ctx)
264 {
265         messaging_dgm_destroy();
266         return 0;
267 }
268
269 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
270                                          struct tevent_context *ev)
271 {
272         struct messaging_context *ctx;
273         NTSTATUS status;
274         int ret;
275
276         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
277                 return NULL;
278         }
279
280         ctx->id = procid_self();
281         ctx->event_ctx = ev;
282
283         sec_init();
284
285         ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
286                                  lp_cache_directory(), sec_initial_uid(),
287                                  messaging_recv_cb, ctx);
288
289         if (ret != 0) {
290                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
291                 TALLOC_FREE(ctx);
292                 return NULL;
293         }
294
295         talloc_set_destructor(ctx, messaging_context_destructor);
296
297         if (lp_clustering()) {
298                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
299
300                 if (!NT_STATUS_IS_OK(status)) {
301                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
302                                   nt_errstr(status)));
303                         TALLOC_FREE(ctx);
304                         return NULL;
305                 }
306         }
307         ctx->id.vnn = get_my_vnn();
308
309         messaging_register(ctx, NULL, MSG_PING, ping_message);
310
311         /* Register some debugging related messages */
312
313         register_msg_pool_usage(ctx);
314         register_dmalloc_msgs(ctx);
315         debug_register_msgs(ctx);
316
317         return ctx;
318 }
319
320 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
321 {
322         return msg_ctx->id;
323 }
324
325 /*
326  * re-init after a fork
327  */
328 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
329 {
330         NTSTATUS status;
331         int ret;
332
333         messaging_dgm_destroy();
334
335         msg_ctx->id = procid_self();
336
337         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
338                                  lp_cache_directory(), sec_initial_uid(),
339                                  messaging_recv_cb, msg_ctx);
340         if (ret != 0) {
341                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
342                 return map_nt_error_from_unix(ret);
343         }
344
345         TALLOC_FREE(msg_ctx->remote);
346
347         if (lp_clustering()) {
348                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
349                                               &msg_ctx->remote);
350
351                 if (!NT_STATUS_IS_OK(status)) {
352                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
353                                   nt_errstr(status)));
354                         return status;
355                 }
356         }
357
358         return NT_STATUS_OK;
359 }
360
361
362 /*
363  * Register a dispatch function for a particular message type. Allow multiple
364  * registrants
365 */
366 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
367                             void *private_data,
368                             uint32_t msg_type,
369                             void (*fn)(struct messaging_context *msg,
370                                        void *private_data, 
371                                        uint32_t msg_type, 
372                                        struct server_id server_id,
373                                        DATA_BLOB *data))
374 {
375         struct messaging_callback *cb;
376
377         DEBUG(5, ("Registering messaging pointer for type %u - "
378                   "private_data=%p\n",
379                   (unsigned)msg_type, private_data));
380
381         /*
382          * Only one callback per type
383          */
384
385         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
386                 /* we allow a second registration of the same message
387                    type if it has a different private pointer. This is
388                    needed in, for example, the internal notify code,
389                    which creates a new notify context for each tree
390                    connect, and expects to receive messages to each of
391                    them. */
392                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
393                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
394                                   (unsigned)msg_type, private_data));
395                         cb->fn = fn;
396                         cb->private_data = private_data;
397                         return NT_STATUS_OK;
398                 }
399         }
400
401         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
402                 return NT_STATUS_NO_MEMORY;
403         }
404
405         cb->msg_type = msg_type;
406         cb->fn = fn;
407         cb->private_data = private_data;
408
409         DLIST_ADD(msg_ctx->callbacks, cb);
410         return NT_STATUS_OK;
411 }
412
413 /*
414   De-register the function for a particular message type.
415 */
416 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
417                           void *private_data)
418 {
419         struct messaging_callback *cb, *next;
420
421         for (cb = ctx->callbacks; cb; cb = next) {
422                 next = cb->next;
423                 if ((cb->msg_type == msg_type)
424                     && (cb->private_data == private_data)) {
425                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
426                                   (unsigned)msg_type, private_data));
427                         DLIST_REMOVE(ctx->callbacks, cb);
428                         TALLOC_FREE(cb);
429                 }
430         }
431 }
432
433 /*
434   Send a message to a particular server
435 */
436 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
437                         struct server_id server, uint32_t msg_type,
438                         const DATA_BLOB *data)
439 {
440         struct iovec iov;
441
442         iov.iov_base = data->data;
443         iov.iov_len = data->length;
444
445         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
446 }
447
448 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
449                             struct server_id server, uint32_t msg_type,
450                             const uint8_t *buf, size_t len)
451 {
452         DATA_BLOB blob = data_blob_const(buf, len);
453         return messaging_send(msg_ctx, server, msg_type, &blob);
454 }
455
456 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
457                             struct server_id server, uint32_t msg_type,
458                             const struct iovec *iov, int iovlen,
459                             const int *fds, size_t num_fds)
460 {
461         int ret;
462         struct messaging_hdr hdr;
463         struct iovec iov2[iovlen+1];
464
465         if (server_id_is_disconnected(&server)) {
466                 return NT_STATUS_INVALID_PARAMETER_MIX;
467         }
468
469         if (num_fds > INT8_MAX) {
470                 return NT_STATUS_INVALID_PARAMETER_MIX;
471         }
472
473         if (!procid_is_local(&server)) {
474                 if (num_fds > 0) {
475                         return NT_STATUS_NOT_SUPPORTED;
476                 }
477
478                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
479                                                msg_type, iov, iovlen,
480                                                NULL, 0,
481                                                msg_ctx->remote);
482                 if (ret != 0) {
483                         return map_nt_error_from_unix(ret);
484                 }
485                 return NT_STATUS_OK;
486         }
487
488         if (server_id_same_process(&msg_ctx->id, &server)) {
489                 struct messaging_rec rec;
490                 uint8_t *buf;
491
492                 /*
493                  * Self-send, directly dispatch
494                  */
495
496                 if (num_fds > 0) {
497                         return NT_STATUS_NOT_SUPPORTED;
498                 }
499
500                 buf = iov_buf(talloc_tos(), iov, iovlen);
501                 if (buf == NULL) {
502                         return NT_STATUS_NO_MEMORY;
503                 }
504
505                 rec = (struct messaging_rec) {
506                         .msg_version = MESSAGE_VERSION,
507                         .msg_type = msg_type & MSG_TYPE_MASK,
508                         .dest = server,
509                         .src = msg_ctx->id,
510                         .buf = data_blob_const(buf, talloc_get_size(buf)),
511                 };
512
513                 messaging_dispatch_rec(msg_ctx, &rec);
514                 TALLOC_FREE(buf);
515                 return NT_STATUS_OK;
516         }
517
518         hdr = (struct messaging_hdr) {
519                 .msg_type = msg_type,
520                 .dst = server,
521                 .src = msg_ctx->id
522         };
523         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
524         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
525
526         become_root();
527         ret = messaging_dgm_send(server.pid, iov2, iovlen+1, fds, num_fds);
528         unbecome_root();
529
530         if (ret != 0) {
531                 return map_nt_error_from_unix(ret);
532         }
533         return NT_STATUS_OK;
534 }
535
536 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
537                                                struct messaging_rec *rec)
538 {
539         struct messaging_rec *result;
540         size_t fds_size = sizeof(int64_t) * rec->num_fds;
541
542         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
543                                       rec->buf.length + fds_size);
544         if (result == NULL) {
545                 return NULL;
546         }
547         *result = *rec;
548
549         /* Doesn't fail, see talloc_pooled_object */
550
551         result->buf.data = talloc_memdup(result, rec->buf.data,
552                                          rec->buf.length);
553
554         result->fds = NULL;
555         if (result->num_fds > 0) {
556                 result->fds = talloc_array(result, int64_t, result->num_fds);
557                 memcpy(result->fds, rec->fds, fds_size);
558         }
559
560         return result;
561 }
562
563 struct messaging_filtered_read_state {
564         struct tevent_context *ev;
565         struct messaging_context *msg_ctx;
566         void *tevent_handle;
567
568         bool (*filter)(struct messaging_rec *rec, void *private_data);
569         void *private_data;
570
571         struct messaging_rec *rec;
572 };
573
574 static void messaging_filtered_read_cleanup(struct tevent_req *req,
575                                             enum tevent_req_state req_state);
576
577 struct tevent_req *messaging_filtered_read_send(
578         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
579         struct messaging_context *msg_ctx,
580         bool (*filter)(struct messaging_rec *rec, void *private_data),
581         void *private_data)
582 {
583         struct tevent_req *req;
584         struct messaging_filtered_read_state *state;
585         size_t new_waiters_len;
586
587         req = tevent_req_create(mem_ctx, &state,
588                                 struct messaging_filtered_read_state);
589         if (req == NULL) {
590                 return NULL;
591         }
592         state->ev = ev;
593         state->msg_ctx = msg_ctx;
594         state->filter = filter;
595         state->private_data = private_data;
596
597         /*
598          * We have to defer the callback here, as we might be called from
599          * within a different tevent_context than state->ev
600          */
601         tevent_req_defer_callback(req, state->ev);
602
603         state->tevent_handle = messaging_dgm_register_tevent_context(
604                 state, ev);
605         if (tevent_req_nomem(state, req)) {
606                 return tevent_req_post(req, ev);
607         }
608
609         /*
610          * We add ourselves to the "new_waiters" array, not the "waiters"
611          * array. If we are called from within messaging_read_done,
612          * messaging_dispatch_rec will be in an active for-loop on
613          * "waiters". We must be careful not to mess with this array, because
614          * it could mean that a single event is being delivered twice.
615          */
616
617         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
618
619         if (new_waiters_len == msg_ctx->num_new_waiters) {
620                 struct tevent_req **tmp;
621
622                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
623                                      struct tevent_req *, new_waiters_len+1);
624                 if (tevent_req_nomem(tmp, req)) {
625                         return tevent_req_post(req, ev);
626                 }
627                 msg_ctx->new_waiters = tmp;
628         }
629
630         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
631         msg_ctx->num_new_waiters += 1;
632         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
633
634         return req;
635 }
636
637 static void messaging_filtered_read_cleanup(struct tevent_req *req,
638                                             enum tevent_req_state req_state)
639 {
640         struct messaging_filtered_read_state *state = tevent_req_data(
641                 req, struct messaging_filtered_read_state);
642         struct messaging_context *msg_ctx = state->msg_ctx;
643         unsigned i;
644
645         tevent_req_set_cleanup_fn(req, NULL);
646
647         TALLOC_FREE(state->tevent_handle);
648
649         /*
650          * Just set the [new_]waiters entry to NULL, be careful not to mess
651          * with the other "waiters" array contents. We are often called from
652          * within "messaging_dispatch_rec", which loops over
653          * "waiters". Messing with the "waiters" array will mess up that
654          * for-loop.
655          */
656
657         for (i=0; i<msg_ctx->num_waiters; i++) {
658                 if (msg_ctx->waiters[i] == req) {
659                         msg_ctx->waiters[i] = NULL;
660                         return;
661                 }
662         }
663
664         for (i=0; i<msg_ctx->num_new_waiters; i++) {
665                 if (msg_ctx->new_waiters[i] == req) {
666                         msg_ctx->new_waiters[i] = NULL;
667                         return;
668                 }
669         }
670 }
671
672 static void messaging_filtered_read_done(struct tevent_req *req,
673                                          struct messaging_rec *rec)
674 {
675         struct messaging_filtered_read_state *state = tevent_req_data(
676                 req, struct messaging_filtered_read_state);
677
678         state->rec = messaging_rec_dup(state, rec);
679         if (tevent_req_nomem(state->rec, req)) {
680                 return;
681         }
682         tevent_req_done(req);
683 }
684
685 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
686                                  struct messaging_rec **presult)
687 {
688         struct messaging_filtered_read_state *state = tevent_req_data(
689                 req, struct messaging_filtered_read_state);
690         int err;
691
692         if (tevent_req_is_unix_error(req, &err)) {
693                 tevent_req_received(req);
694                 return err;
695         }
696         *presult = talloc_move(mem_ctx, &state->rec);
697         return 0;
698 }
699
700 struct messaging_read_state {
701         uint32_t msg_type;
702         struct messaging_rec *rec;
703 };
704
705 static bool messaging_read_filter(struct messaging_rec *rec,
706                                   void *private_data);
707 static void messaging_read_done(struct tevent_req *subreq);
708
709 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
710                                        struct tevent_context *ev,
711                                        struct messaging_context *msg,
712                                        uint32_t msg_type)
713 {
714         struct tevent_req *req, *subreq;
715         struct messaging_read_state *state;
716
717         req = tevent_req_create(mem_ctx, &state,
718                                 struct messaging_read_state);
719         if (req == NULL) {
720                 return NULL;
721         }
722         state->msg_type = msg_type;
723
724         subreq = messaging_filtered_read_send(state, ev, msg,
725                                               messaging_read_filter, state);
726         if (tevent_req_nomem(subreq, req)) {
727                 return tevent_req_post(req, ev);
728         }
729         tevent_req_set_callback(subreq, messaging_read_done, req);
730         return req;
731 }
732
733 static bool messaging_read_filter(struct messaging_rec *rec,
734                                   void *private_data)
735 {
736         struct messaging_read_state *state = talloc_get_type_abort(
737                 private_data, struct messaging_read_state);
738
739         if (rec->num_fds != 0) {
740                 return false;
741         }
742
743         return rec->msg_type == state->msg_type;
744 }
745
746 static void messaging_read_done(struct tevent_req *subreq)
747 {
748         struct tevent_req *req = tevent_req_callback_data(
749                 subreq, struct tevent_req);
750         struct messaging_read_state *state = tevent_req_data(
751                 req, struct messaging_read_state);
752         int ret;
753
754         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
755         TALLOC_FREE(subreq);
756         if (tevent_req_error(req, ret)) {
757                 return;
758         }
759         tevent_req_done(req);
760 }
761
762 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
763                         struct messaging_rec **presult)
764 {
765         struct messaging_read_state *state = tevent_req_data(
766                 req, struct messaging_read_state);
767         int err;
768
769         if (tevent_req_is_unix_error(req, &err)) {
770                 return err;
771         }
772         if (presult != NULL) {
773                 *presult = talloc_move(mem_ctx, &state->rec);
774         }
775         return 0;
776 }
777
778 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
779 {
780         if (msg_ctx->num_new_waiters == 0) {
781                 return true;
782         }
783
784         if (talloc_array_length(msg_ctx->waiters) <
785             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
786                 struct tevent_req **tmp;
787                 tmp = talloc_realloc(
788                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
789                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
790                 if (tmp == NULL) {
791                         DEBUG(1, ("%s: talloc failed\n", __func__));
792                         return false;
793                 }
794                 msg_ctx->waiters = tmp;
795         }
796
797         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
798                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
799
800         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
801         msg_ctx->num_new_waiters = 0;
802
803         return true;
804 }
805
806 struct messaging_defer_callback_state {
807         struct messaging_context *msg_ctx;
808         struct messaging_rec *rec;
809         void (*fn)(struct messaging_context *msg, void *private_data,
810                    uint32_t msg_type, struct server_id server_id,
811                    DATA_BLOB *data);
812         void *private_data;
813 };
814
815 static void messaging_defer_callback_trigger(struct tevent_context *ev,
816                                              struct tevent_immediate *im,
817                                              void *private_data);
818
819 static void messaging_defer_callback(
820         struct messaging_context *msg_ctx, struct messaging_rec *rec,
821         void (*fn)(struct messaging_context *msg, void *private_data,
822                    uint32_t msg_type, struct server_id server_id,
823                    DATA_BLOB *data),
824         void *private_data)
825 {
826         struct messaging_defer_callback_state *state;
827         struct tevent_immediate *im;
828
829         state = talloc(msg_ctx, struct messaging_defer_callback_state);
830         if (state == NULL) {
831                 DEBUG(1, ("talloc failed\n"));
832                 return;
833         }
834         state->msg_ctx = msg_ctx;
835         state->fn = fn;
836         state->private_data = private_data;
837
838         state->rec = messaging_rec_dup(state, rec);
839         if (state->rec == NULL) {
840                 DEBUG(1, ("talloc failed\n"));
841                 TALLOC_FREE(state);
842                 return;
843         }
844
845         im = tevent_create_immediate(state);
846         if (im == NULL) {
847                 DEBUG(1, ("tevent_create_immediate failed\n"));
848                 TALLOC_FREE(state);
849                 return;
850         }
851         tevent_schedule_immediate(im, msg_ctx->event_ctx,
852                                   messaging_defer_callback_trigger, state);
853 }
854
855 static void messaging_defer_callback_trigger(struct tevent_context *ev,
856                                              struct tevent_immediate *im,
857                                              void *private_data)
858 {
859         struct messaging_defer_callback_state *state = talloc_get_type_abort(
860                 private_data, struct messaging_defer_callback_state);
861         struct messaging_rec *rec = state->rec;
862
863         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
864                   &rec->buf);
865         TALLOC_FREE(state);
866 }
867
868 /*
869   Dispatch one messaging_rec
870 */
871 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
872                             struct messaging_rec *rec)
873 {
874         struct messaging_callback *cb, *next;
875         unsigned i;
876         size_t j;
877
878         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
879                 next = cb->next;
880                 if (cb->msg_type != rec->msg_type) {
881                         continue;
882                 }
883
884                 /*
885                  * the old style callbacks don't support fd passing
886                  */
887                 for (j=0; j < rec->num_fds; j++) {
888                         int fd = rec->fds[j];
889                         close(fd);
890                 }
891                 rec->num_fds = 0;
892                 rec->fds = NULL;
893
894                 if (server_id_same_process(&rec->src, &rec->dest)) {
895                         /*
896                          * This is a self-send. We are called here from
897                          * messaging_send(), and we don't want to directly
898                          * recurse into the callback but go via a
899                          * tevent_loop_once
900                          */
901                         messaging_defer_callback(msg_ctx, rec, cb->fn,
902                                                  cb->private_data);
903                 } else {
904                         /*
905                          * This comes from a different process. we are called
906                          * from the event loop, so we should call back
907                          * directly.
908                          */
909                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
910                                rec->src, &rec->buf);
911                 }
912                 /*
913                  * we continue looking for matching messages after finding
914                  * one. This matters for subsystems like the internal notify
915                  * code which register more than one handler for the same
916                  * message type
917                  */
918         }
919
920         if (!messaging_append_new_waiters(msg_ctx)) {
921                 for (j=0; j < rec->num_fds; j++) {
922                         int fd = rec->fds[j];
923                         close(fd);
924                 }
925                 rec->num_fds = 0;
926                 rec->fds = NULL;
927                 return;
928         }
929
930         i = 0;
931         while (i < msg_ctx->num_waiters) {
932                 struct tevent_req *req;
933                 struct messaging_filtered_read_state *state;
934
935                 req = msg_ctx->waiters[i];
936                 if (req == NULL) {
937                         /*
938                          * This got cleaned up. In the meantime,
939                          * move everything down one. We need
940                          * to keep the order of waiters, as
941                          * other code may depend on this.
942                          */
943                         if (i < msg_ctx->num_waiters - 1) {
944                                 memmove(&msg_ctx->waiters[i],
945                                         &msg_ctx->waiters[i+1],
946                                         sizeof(struct tevent_req *) *
947                                             (msg_ctx->num_waiters - i - 1));
948                         }
949                         msg_ctx->num_waiters -= 1;
950                         continue;
951                 }
952
953                 state = tevent_req_data(
954                         req, struct messaging_filtered_read_state);
955                 if (state->filter(rec, state->private_data)) {
956                         messaging_filtered_read_done(req, rec);
957
958                         /*
959                          * Only the first one gets the fd-array
960                          */
961                         rec->num_fds = 0;
962                         rec->fds = NULL;
963                 }
964
965                 i += 1;
966         }
967
968         /*
969          * If the fd-array isn't used, just close it.
970          */
971         for (j=0; j < rec->num_fds; j++) {
972                 int fd = rec->fds[j];
973                 close(fd);
974         }
975         rec->num_fds = 0;
976         rec->fds = NULL;
977 }
978
979 static int mess_parent_dgm_cleanup(void *private_data);
980 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
981
982 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
983 {
984         struct tevent_req *req;
985
986         req = background_job_send(
987                 msg, msg->event_ctx, msg, NULL, 0,
988                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
989                             60*15),
990                 mess_parent_dgm_cleanup, msg);
991         if (req == NULL) {
992                 return false;
993         }
994         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
995         return true;
996 }
997
998 static int mess_parent_dgm_cleanup(void *private_data)
999 {
1000         int ret;
1001
1002         ret = messaging_dgm_wipe();
1003         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1004                    ret ? strerror(ret) : "ok"));
1005         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1006                            60*15);
1007 }
1008
1009 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1010 {
1011         struct messaging_context *msg = tevent_req_callback_data(
1012                 req, struct messaging_context);
1013         NTSTATUS status;
1014
1015         status = background_job_recv(req);
1016         TALLOC_FREE(req);
1017         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1018                   nt_errstr(status)));
1019
1020         req = background_job_send(
1021                 msg, msg->event_ctx, msg, NULL, 0,
1022                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1023                             60*15),
1024                 mess_parent_dgm_cleanup, msg);
1025         if (req == NULL) {
1026                 DEBUG(1, ("background_job_send failed\n"));
1027         }
1028         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1029 }
1030
1031 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1032 {
1033         int ret;
1034
1035         if (pid == 0) {
1036                 ret = messaging_dgm_wipe();
1037         } else {
1038                 ret = messaging_dgm_cleanup(pid);
1039         }
1040
1041         return ret;
1042 }
1043
1044 struct tevent_context *messaging_tevent_context(
1045         struct messaging_context *msg_ctx)
1046 {
1047         return msg_ctx->event_ctx;
1048 }
1049
1050 /** @} **/