messaging: Broadcast messages to all event contexts
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
61
62 struct messaging_callback {
63         struct messaging_callback *prev, *next;
64         uint32_t msg_type;
65         void (*fn)(struct messaging_context *msg, void *private_data, 
66                    uint32_t msg_type, 
67                    struct server_id server_id, DATA_BLOB *data);
68         void *private_data;
69 };
70
71 struct messaging_registered_ev {
72         struct tevent_context *ev;
73         struct tevent_immediate *im;
74         size_t refcount;
75 };
76
77 struct messaging_context {
78         struct server_id id;
79         struct tevent_context *event_ctx;
80         struct messaging_callback *callbacks;
81
82         struct messaging_rec *posted_msgs;
83
84         struct messaging_registered_ev *event_contexts;
85
86         struct tevent_req **new_waiters;
87         size_t num_new_waiters;
88
89         struct tevent_req **waiters;
90         size_t num_waiters;
91
92         void *msg_dgm_ref;
93         struct messaging_backend *remote;
94
95         struct server_id_db *names_db;
96 };
97
98 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
99                                                struct messaging_rec *rec);
100 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
101                                        struct messaging_rec *rec);
102 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
103                                        struct tevent_context *ev,
104                                        struct messaging_rec *rec);
105 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
106                                    struct tevent_context *ev,
107                                    struct messaging_rec *rec);
108
109 /****************************************************************************
110  A useful function for testing the message system.
111 ****************************************************************************/
112
113 static void ping_message(struct messaging_context *msg_ctx,
114                          void *private_data,
115                          uint32_t msg_type,
116                          struct server_id src,
117                          DATA_BLOB *data)
118 {
119         struct server_id_buf idbuf;
120
121         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
122                   server_id_str_buf(src, &idbuf), (int)data->length,
123                   data->data ? (char *)data->data : ""));
124
125         messaging_send(msg_ctx, src, MSG_PONG, data);
126 }
127
128 struct messaging_rec *messaging_rec_create(
129         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
130         uint32_t msg_type, const struct iovec *iov, int iovlen,
131         const int *fds, size_t num_fds)
132 {
133         ssize_t buflen;
134         uint8_t *buf;
135         struct messaging_rec *result;
136
137         if (num_fds > INT8_MAX) {
138                 return NULL;
139         }
140
141         buflen = iov_buflen(iov, iovlen);
142         if (buflen == -1) {
143                 return NULL;
144         }
145         buf = talloc_array(mem_ctx, uint8_t, buflen);
146         if (buf == NULL) {
147                 return NULL;
148         }
149         iov_buf(iov, iovlen, buf, buflen);
150
151         {
152                 struct messaging_rec rec;
153                 int64_t fds64[num_fds];
154                 size_t i;
155
156                 for (i=0; i<num_fds; i++) {
157                         fds64[i] = fds[i];
158                 }
159
160                 rec = (struct messaging_rec) {
161                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
162                         .src = src, .dest = dst,
163                         .buf.data = buf, .buf.length = buflen,
164                         .num_fds = num_fds, .fds = fds64,
165                 };
166
167                 result = messaging_rec_dup(mem_ctx, &rec);
168         }
169
170         TALLOC_FREE(buf);
171
172         return result;
173 }
174
175 static bool messaging_register_event_context(struct messaging_context *ctx,
176                                              struct tevent_context *ev)
177 {
178         size_t i, num_event_contexts;
179         struct messaging_registered_ev *free_reg = NULL;
180         struct messaging_registered_ev *tmp;
181
182         num_event_contexts = talloc_array_length(ctx->event_contexts);
183
184         for (i=0; i<num_event_contexts; i++) {
185                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
186
187                 if (reg->ev == ev) {
188                         reg->refcount += 1;
189                         return true;
190                 }
191                 if (reg->refcount == 0) {
192                         if (reg->ev != NULL) {
193                                 abort();
194                         }
195                         free_reg = reg;
196                 }
197         }
198
199         if (free_reg == NULL) {
200                 tmp = talloc_realloc(ctx, ctx->event_contexts,
201                                      struct messaging_registered_ev,
202                                      num_event_contexts+1);
203                 if (tmp == NULL) {
204                         return false;
205                 }
206                 ctx->event_contexts = tmp;
207
208                 free_reg = &ctx->event_contexts[num_event_contexts];
209         }
210
211         *free_reg = (struct messaging_registered_ev) { .ev = ev, .refcount = 1 };
212
213         return true;
214 }
215
216 static bool messaging_deregister_event_context(struct messaging_context *ctx,
217                                                struct tevent_context *ev)
218 {
219         size_t i, num_event_contexts;
220
221         num_event_contexts = talloc_array_length(ctx->event_contexts);
222
223         for (i=0; i<num_event_contexts; i++) {
224                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
225
226                 if (reg->ev == ev) {
227                         if (reg->refcount == 0) {
228                                 return false;
229                         }
230                         reg->refcount -= 1;
231
232                         if (reg->refcount == 0) {
233                                 /*
234                                  * Not strictly necessary, just
235                                  * paranoia
236                                  */
237                                 reg->ev = NULL;
238
239                                 /*
240                                  * Do not talloc_free(reg->im),
241                                  * recycle immediates events.
242                                  */
243                         }
244                         return true;
245                 }
246         }
247         return false;
248 }
249
250 static void messaging_post_main_event_context(struct tevent_context *ev,
251                                               struct tevent_immediate *im,
252                                               void *private_data)
253 {
254         struct messaging_context *ctx = talloc_get_type_abort(
255                 private_data, struct messaging_context);
256
257         while (ctx->posted_msgs != NULL) {
258                 struct messaging_rec *rec = ctx->posted_msgs;
259                 bool consumed;
260
261                 DLIST_REMOVE(ctx->posted_msgs, rec);
262
263                 consumed = messaging_dispatch_classic(ctx, rec);
264                 if (!consumed) {
265                         consumed = messaging_dispatch_waiters(
266                                 ctx, ctx->event_ctx, rec);
267                 }
268
269                 if (!consumed) {
270                         uint8_t i;
271
272                         for (i=0; i<rec->num_fds; i++) {
273                                 close(rec->fds[i]);
274                         }
275                 }
276
277                 TALLOC_FREE(rec);
278         }
279 }
280
281 static void messaging_post_sub_event_context(struct tevent_context *ev,
282                                              struct tevent_immediate *im,
283                                              void *private_data)
284 {
285         struct messaging_context *ctx = talloc_get_type_abort(
286                 private_data, struct messaging_context);
287         struct messaging_rec *rec, *next;
288
289         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
290                 bool consumed;
291
292                 next = rec->next;
293
294                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
295                 if (consumed) {
296                         DLIST_REMOVE(ctx->posted_msgs, rec);
297                         TALLOC_FREE(rec);
298                 }
299         }
300 }
301
302 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
303 {
304         size_t i, num_event_contexts;
305
306         num_event_contexts = talloc_array_length(ctx->event_contexts);
307
308         for (i=0; i<num_event_contexts; i++) {
309                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
310
311                 if (reg->refcount == 0) {
312                         continue;
313                 }
314
315                 if (reg->im == NULL) {
316                         reg->im = tevent_create_immediate(
317                                 ctx->event_contexts);
318                 }
319                 if (reg->im == NULL) {
320                         DBG_WARNING("Could not create immediate\n");
321                         continue;
322                 }
323
324                 /*
325                  * We depend on schedule_immediate to work
326                  * multiple times. Might be a bit inefficient,
327                  * but this needs to be proven in tests. The
328                  * alternatively would be to track whether the
329                  * immediate has already been scheduled. For
330                  * now, avoid that complexity here.
331                  */
332
333                 if (reg->ev == ctx->event_ctx) {
334                         tevent_schedule_immediate(
335                                 reg->im, reg->ev,
336                                 messaging_post_main_event_context,
337                                 ctx);
338                 } else {
339                         tevent_schedule_immediate(
340                                 reg->im, reg->ev,
341                                 messaging_post_sub_event_context,
342                                 ctx);
343                 }
344
345         }
346         return true;
347 }
348
349 static void messaging_recv_cb(struct tevent_context *ev,
350                               const uint8_t *msg, size_t msg_len,
351                               int *fds, size_t num_fds,
352                               void *private_data)
353 {
354         struct messaging_context *msg_ctx = talloc_get_type_abort(
355                 private_data, struct messaging_context);
356         struct server_id_buf idbuf;
357         struct messaging_rec rec;
358         int64_t fds64[MIN(num_fds, INT8_MAX)];
359         size_t i;
360
361         if (msg_len < MESSAGE_HDR_LENGTH) {
362                 DBG_WARNING("message too short: %zu\n", msg_len);
363                 goto close_fail;
364         }
365
366         if (num_fds > INT8_MAX) {
367                 DBG_WARNING("too many fds: %zu\n", num_fds);
368                 goto close_fail;
369         }
370
371         /*
372          * "consume" the fds by copying them and setting
373          * the original variable to -1
374          */
375         for (i=0; i < num_fds; i++) {
376                 fds64[i] = fds[i];
377                 fds[i] = -1;
378         }
379
380         rec = (struct messaging_rec) {
381                 .msg_version = MESSAGE_VERSION,
382                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
383                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
384                 .num_fds = num_fds,
385                 .fds = fds64,
386         };
387
388         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
389
390         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
391                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
392                   server_id_str_buf(rec.src, &idbuf));
393
394         messaging_dispatch_rec(msg_ctx, ev, &rec);
395         return;
396
397 close_fail:
398         for (i=0; i < num_fds; i++) {
399                 close(fds[i]);
400         }
401 }
402
403 static int messaging_context_destructor(struct messaging_context *ctx)
404 {
405         size_t i;
406
407         for (i=0; i<ctx->num_new_waiters; i++) {
408                 if (ctx->new_waiters[i] != NULL) {
409                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
410                         ctx->new_waiters[i] = NULL;
411                 }
412         }
413         for (i=0; i<ctx->num_waiters; i++) {
414                 if (ctx->waiters[i] != NULL) {
415                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
416                         ctx->waiters[i] = NULL;
417                 }
418         }
419
420         /*
421          * The immediates from messaging_alert_event_contexts
422          * reference "ctx". Don't let them outlive the
423          * messaging_context we're destroying here.
424          */
425         TALLOC_FREE(ctx->event_contexts);
426
427         return 0;
428 }
429
430 static const char *private_path(const char *name)
431 {
432         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
433 }
434
435 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
436                                         struct tevent_context *ev,
437                                         struct messaging_context **pmsg_ctx)
438 {
439         TALLOC_CTX *frame;
440         struct messaging_context *ctx;
441         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
442         int ret;
443         const char *lck_path;
444         const char *priv_path;
445         bool ok;
446
447         lck_path = lock_path("msg.lock");
448         if (lck_path == NULL) {
449                 return NT_STATUS_NO_MEMORY;
450         }
451
452         ok = directory_create_or_exist_strict(lck_path,
453                                               sec_initial_uid(),
454                                               0755);
455         if (!ok) {
456                 DBG_DEBUG("Could not create lock directory: %s\n",
457                           strerror(errno));
458                 return NT_STATUS_ACCESS_DENIED;
459         }
460
461         priv_path = private_path("msg.sock");
462         if (priv_path == NULL) {
463                 return NT_STATUS_NO_MEMORY;
464         }
465
466         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
467                                               0700);
468         if (!ok) {
469                 DBG_DEBUG("Could not create msg directory: %s\n",
470                           strerror(errno));
471                 return NT_STATUS_ACCESS_DENIED;
472         }
473
474         frame = talloc_stackframe();
475         if (frame == NULL) {
476                 return NT_STATUS_NO_MEMORY;
477         }
478
479         ctx = talloc_zero(frame, struct messaging_context);
480         if (ctx == NULL) {
481                 status = NT_STATUS_NO_MEMORY;
482                 goto done;
483         }
484
485         ctx->id = (struct server_id) {
486                 .pid = getpid(), .vnn = NONCLUSTER_VNN
487         };
488
489         ctx->event_ctx = ev;
490
491         ok = messaging_register_event_context(ctx, ev);
492         if (!ok) {
493                 status = NT_STATUS_NO_MEMORY;
494                 goto done;
495         }
496
497         sec_init();
498
499         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
500                                              ctx->event_ctx,
501                                              &ctx->id.unique_id,
502                                              priv_path,
503                                              lck_path,
504                                              messaging_recv_cb,
505                                              ctx,
506                                              &ret);
507         if (ctx->msg_dgm_ref == NULL) {
508                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
509                 status = map_nt_error_from_unix(ret);
510                 goto done;
511         }
512         talloc_set_destructor(ctx, messaging_context_destructor);
513
514         if (lp_clustering()) {
515                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
516
517                 if (ret != 0) {
518                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
519                                   strerror(ret)));
520                         status = map_nt_error_from_unix(ret);
521                         goto done;
522                 }
523         }
524         ctx->id.vnn = get_my_vnn();
525
526         ctx->names_db = server_id_db_init(ctx,
527                                           ctx->id,
528                                           lp_lock_directory(),
529                                           0,
530                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
531         if (ctx->names_db == NULL) {
532                 DBG_DEBUG("server_id_db_init failed\n");
533                 status = NT_STATUS_NO_MEMORY;
534                 goto done;
535         }
536
537         messaging_register(ctx, NULL, MSG_PING, ping_message);
538
539         /* Register some debugging related messages */
540
541         register_msg_pool_usage(ctx);
542         register_dmalloc_msgs(ctx);
543         debug_register_msgs(ctx);
544
545         {
546                 struct server_id_buf tmp;
547                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
548         }
549
550         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
551
552         status = NT_STATUS_OK;
553 done:
554         TALLOC_FREE(frame);
555
556         return status;
557 }
558
559 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
560                                          struct tevent_context *ev)
561 {
562         struct messaging_context *ctx = NULL;
563         NTSTATUS status;
564
565         status = messaging_init_internal(mem_ctx,
566                                          ev,
567                                          &ctx);
568         if (!NT_STATUS_IS_OK(status)) {
569                 return NULL;
570         }
571
572         return ctx;
573 }
574
575 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
576                                struct tevent_context *ev,
577                                struct messaging_context **pmsg_ctx)
578 {
579         return messaging_init_internal(mem_ctx,
580                                         ev,
581                                         pmsg_ctx);
582 }
583
584 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
585 {
586         return msg_ctx->id;
587 }
588
589 /*
590  * re-init after a fork
591  */
592 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
593 {
594         int ret;
595         char *lck_path;
596
597         TALLOC_FREE(msg_ctx->msg_dgm_ref);
598
599         msg_ctx->id = (struct server_id) {
600                 .pid = getpid(), .vnn = msg_ctx->id.vnn
601         };
602
603         lck_path = lock_path("msg.lock");
604         if (lck_path == NULL) {
605                 return NT_STATUS_NO_MEMORY;
606         }
607
608         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
609                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
610                 private_path("msg.sock"), lck_path,
611                 messaging_recv_cb, msg_ctx, &ret);
612
613         if (msg_ctx->msg_dgm_ref == NULL) {
614                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
615                 return map_nt_error_from_unix(ret);
616         }
617
618         if (lp_clustering()) {
619                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
620                                              msg_ctx->remote);
621
622                 if (ret != 0) {
623                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
624                                   strerror(ret)));
625                         return map_nt_error_from_unix(ret);
626                 }
627         }
628
629         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
630
631         return NT_STATUS_OK;
632 }
633
634
635 /*
636  * Register a dispatch function for a particular message type. Allow multiple
637  * registrants
638 */
639 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
640                             void *private_data,
641                             uint32_t msg_type,
642                             void (*fn)(struct messaging_context *msg,
643                                        void *private_data, 
644                                        uint32_t msg_type, 
645                                        struct server_id server_id,
646                                        DATA_BLOB *data))
647 {
648         struct messaging_callback *cb;
649
650         DEBUG(5, ("Registering messaging pointer for type %u - "
651                   "private_data=%p\n",
652                   (unsigned)msg_type, private_data));
653
654         /*
655          * Only one callback per type
656          */
657
658         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
659                 /* we allow a second registration of the same message
660                    type if it has a different private pointer. This is
661                    needed in, for example, the internal notify code,
662                    which creates a new notify context for each tree
663                    connect, and expects to receive messages to each of
664                    them. */
665                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
666                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
667                                   (unsigned)msg_type, private_data));
668                         cb->fn = fn;
669                         cb->private_data = private_data;
670                         return NT_STATUS_OK;
671                 }
672         }
673
674         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
675                 return NT_STATUS_NO_MEMORY;
676         }
677
678         cb->msg_type = msg_type;
679         cb->fn = fn;
680         cb->private_data = private_data;
681
682         DLIST_ADD(msg_ctx->callbacks, cb);
683         return NT_STATUS_OK;
684 }
685
686 /*
687   De-register the function for a particular message type.
688 */
689 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
690                           void *private_data)
691 {
692         struct messaging_callback *cb, *next;
693
694         for (cb = ctx->callbacks; cb; cb = next) {
695                 next = cb->next;
696                 if ((cb->msg_type == msg_type)
697                     && (cb->private_data == private_data)) {
698                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
699                                   (unsigned)msg_type, private_data));
700                         DLIST_REMOVE(ctx->callbacks, cb);
701                         TALLOC_FREE(cb);
702                 }
703         }
704 }
705
706 /*
707   Send a message to a particular server
708 */
709 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
710                         struct server_id server, uint32_t msg_type,
711                         const DATA_BLOB *data)
712 {
713         struct iovec iov = {0};
714
715         if (data != NULL) {
716                 iov.iov_base = data->data;
717                 iov.iov_len = data->length;
718         };
719
720         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
721 }
722
723 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
724                             struct server_id server, uint32_t msg_type,
725                             const uint8_t *buf, size_t len)
726 {
727         DATA_BLOB blob = data_blob_const(buf, len);
728         return messaging_send(msg_ctx, server, msg_type, &blob);
729 }
730
731 static int messaging_post_self(struct messaging_context *msg_ctx,
732                                struct server_id src, struct server_id dst,
733                                uint32_t msg_type,
734                                const struct iovec *iov, int iovlen,
735                                const int *fds, size_t num_fds)
736 {
737         struct messaging_rec *rec;
738         bool ok;
739
740         rec = messaging_rec_create(
741                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
742         if (rec == NULL) {
743                 return ENOMEM;
744         }
745
746         ok = messaging_alert_event_contexts(msg_ctx);
747         if (!ok) {
748                 TALLOC_FREE(rec);
749                 return ENOMEM;
750         }
751
752         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
753
754         return 0;
755 }
756
757 int messaging_send_iov_from(struct messaging_context *msg_ctx,
758                             struct server_id src, struct server_id dst,
759                             uint32_t msg_type,
760                             const struct iovec *iov, int iovlen,
761                             const int *fds, size_t num_fds)
762 {
763         int ret;
764         uint8_t hdr[MESSAGE_HDR_LENGTH];
765         struct iovec iov2[iovlen+1];
766
767         if (server_id_is_disconnected(&dst)) {
768                 return EINVAL;
769         }
770
771         if (num_fds > INT8_MAX) {
772                 return EINVAL;
773         }
774
775         if (dst.vnn != msg_ctx->id.vnn) {
776                 if (num_fds > 0) {
777                         return ENOSYS;
778                 }
779
780                 ret = msg_ctx->remote->send_fn(src, dst,
781                                                msg_type, iov, iovlen,
782                                                NULL, 0,
783                                                msg_ctx->remote);
784                 return ret;
785         }
786
787         if (server_id_equal(&dst, &msg_ctx->id)) {
788                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
789                                           iov, iovlen, fds, num_fds);
790                 return ret;
791         }
792
793         message_hdr_put(hdr, msg_type, src, dst);
794         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
795         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
796
797         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
798
799         if (ret == EACCES) {
800                 become_root();
801                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
802                                          fds, num_fds);
803                 unbecome_root();
804         }
805
806         if (ret == ECONNREFUSED) {
807                 /*
808                  * Linux returns this when a socket exists in the file
809                  * system without a listening process. This is not
810                  * documented in susv4 or the linux manpages, but it's
811                  * easily testable. For the higher levels this is the
812                  * same as "destination does not exist"
813                  */
814                 ret = ENOENT;
815         }
816
817         return ret;
818 }
819
820 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
821                             struct server_id dst, uint32_t msg_type,
822                             const struct iovec *iov, int iovlen,
823                             const int *fds, size_t num_fds)
824 {
825         int ret;
826
827         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
828                                       iov, iovlen, fds, num_fds);
829         if (ret != 0) {
830                 return map_nt_error_from_unix(ret);
831         }
832         return NT_STATUS_OK;
833 }
834
835 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
836                                                struct messaging_rec *rec)
837 {
838         struct messaging_rec *result;
839         size_t fds_size = sizeof(int64_t) * rec->num_fds;
840         size_t payload_len;
841
842         payload_len = rec->buf.length + fds_size;
843         if (payload_len < rec->buf.length) {
844                 /* overflow */
845                 return NULL;
846         }
847
848         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
849                                       payload_len);
850         if (result == NULL) {
851                 return NULL;
852         }
853         *result = *rec;
854
855         /* Doesn't fail, see talloc_pooled_object */
856
857         result->buf.data = talloc_memdup(result, rec->buf.data,
858                                          rec->buf.length);
859
860         result->fds = NULL;
861         if (result->num_fds > 0) {
862                 result->fds = talloc_memdup(result, rec->fds, fds_size);
863         }
864
865         return result;
866 }
867
868 struct messaging_filtered_read_state {
869         struct tevent_context *ev;
870         struct messaging_context *msg_ctx;
871         struct messaging_dgm_fde *fde;
872
873         bool (*filter)(struct messaging_rec *rec, void *private_data);
874         void *private_data;
875
876         struct messaging_rec *rec;
877 };
878
879 static void messaging_filtered_read_cleanup(struct tevent_req *req,
880                                             enum tevent_req_state req_state);
881
882 struct tevent_req *messaging_filtered_read_send(
883         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
884         struct messaging_context *msg_ctx,
885         bool (*filter)(struct messaging_rec *rec, void *private_data),
886         void *private_data)
887 {
888         struct tevent_req *req;
889         struct messaging_filtered_read_state *state;
890         size_t new_waiters_len;
891         bool ok;
892
893         req = tevent_req_create(mem_ctx, &state,
894                                 struct messaging_filtered_read_state);
895         if (req == NULL) {
896                 return NULL;
897         }
898         state->ev = ev;
899         state->msg_ctx = msg_ctx;
900         state->filter = filter;
901         state->private_data = private_data;
902
903         /*
904          * We have to defer the callback here, as we might be called from
905          * within a different tevent_context than state->ev
906          */
907         tevent_req_defer_callback(req, state->ev);
908
909         state->fde = messaging_dgm_register_tevent_context(state, ev);
910         if (tevent_req_nomem(state->fde, req)) {
911                 return tevent_req_post(req, ev);
912         }
913
914         /*
915          * We add ourselves to the "new_waiters" array, not the "waiters"
916          * array. If we are called from within messaging_read_done,
917          * messaging_dispatch_rec will be in an active for-loop on
918          * "waiters". We must be careful not to mess with this array, because
919          * it could mean that a single event is being delivered twice.
920          */
921
922         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
923
924         if (new_waiters_len == msg_ctx->num_new_waiters) {
925                 struct tevent_req **tmp;
926
927                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
928                                      struct tevent_req *, new_waiters_len+1);
929                 if (tevent_req_nomem(tmp, req)) {
930                         return tevent_req_post(req, ev);
931                 }
932                 msg_ctx->new_waiters = tmp;
933         }
934
935         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
936         msg_ctx->num_new_waiters += 1;
937         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
938
939         ok = messaging_register_event_context(msg_ctx, ev);
940         if (!ok) {
941                 tevent_req_oom(req);
942                 return tevent_req_post(req, ev);
943         }
944
945         return req;
946 }
947
948 static void messaging_filtered_read_cleanup(struct tevent_req *req,
949                                             enum tevent_req_state req_state)
950 {
951         struct messaging_filtered_read_state *state = tevent_req_data(
952                 req, struct messaging_filtered_read_state);
953         struct messaging_context *msg_ctx = state->msg_ctx;
954         size_t i;
955         bool ok;
956
957         tevent_req_set_cleanup_fn(req, NULL);
958
959         TALLOC_FREE(state->fde);
960
961         ok = messaging_deregister_event_context(msg_ctx, state->ev);
962         if (!ok) {
963                 abort();
964         }
965
966         /*
967          * Just set the [new_]waiters entry to NULL, be careful not to mess
968          * with the other "waiters" array contents. We are often called from
969          * within "messaging_dispatch_rec", which loops over
970          * "waiters". Messing with the "waiters" array will mess up that
971          * for-loop.
972          */
973
974         for (i=0; i<msg_ctx->num_waiters; i++) {
975                 if (msg_ctx->waiters[i] == req) {
976                         msg_ctx->waiters[i] = NULL;
977                         return;
978                 }
979         }
980
981         for (i=0; i<msg_ctx->num_new_waiters; i++) {
982                 if (msg_ctx->new_waiters[i] == req) {
983                         msg_ctx->new_waiters[i] = NULL;
984                         return;
985                 }
986         }
987 }
988
989 static void messaging_filtered_read_done(struct tevent_req *req,
990                                          struct messaging_rec *rec)
991 {
992         struct messaging_filtered_read_state *state = tevent_req_data(
993                 req, struct messaging_filtered_read_state);
994
995         state->rec = messaging_rec_dup(state, rec);
996         if (tevent_req_nomem(state->rec, req)) {
997                 return;
998         }
999         tevent_req_done(req);
1000 }
1001
1002 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1003                                  struct messaging_rec **presult)
1004 {
1005         struct messaging_filtered_read_state *state = tevent_req_data(
1006                 req, struct messaging_filtered_read_state);
1007         int err;
1008
1009         if (tevent_req_is_unix_error(req, &err)) {
1010                 tevent_req_received(req);
1011                 return err;
1012         }
1013         if (presult != NULL) {
1014                 *presult = talloc_move(mem_ctx, &state->rec);
1015         }
1016         return 0;
1017 }
1018
1019 struct messaging_read_state {
1020         uint32_t msg_type;
1021         struct messaging_rec *rec;
1022 };
1023
1024 static bool messaging_read_filter(struct messaging_rec *rec,
1025                                   void *private_data);
1026 static void messaging_read_done(struct tevent_req *subreq);
1027
1028 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1029                                        struct tevent_context *ev,
1030                                        struct messaging_context *msg,
1031                                        uint32_t msg_type)
1032 {
1033         struct tevent_req *req, *subreq;
1034         struct messaging_read_state *state;
1035
1036         req = tevent_req_create(mem_ctx, &state,
1037                                 struct messaging_read_state);
1038         if (req == NULL) {
1039                 return NULL;
1040         }
1041         state->msg_type = msg_type;
1042
1043         subreq = messaging_filtered_read_send(state, ev, msg,
1044                                               messaging_read_filter, state);
1045         if (tevent_req_nomem(subreq, req)) {
1046                 return tevent_req_post(req, ev);
1047         }
1048         tevent_req_set_callback(subreq, messaging_read_done, req);
1049         return req;
1050 }
1051
1052 static bool messaging_read_filter(struct messaging_rec *rec,
1053                                   void *private_data)
1054 {
1055         struct messaging_read_state *state = talloc_get_type_abort(
1056                 private_data, struct messaging_read_state);
1057
1058         if (rec->num_fds != 0) {
1059                 return false;
1060         }
1061
1062         return rec->msg_type == state->msg_type;
1063 }
1064
1065 static void messaging_read_done(struct tevent_req *subreq)
1066 {
1067         struct tevent_req *req = tevent_req_callback_data(
1068                 subreq, struct tevent_req);
1069         struct messaging_read_state *state = tevent_req_data(
1070                 req, struct messaging_read_state);
1071         int ret;
1072
1073         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1074         TALLOC_FREE(subreq);
1075         if (tevent_req_error(req, ret)) {
1076                 return;
1077         }
1078         tevent_req_done(req);
1079 }
1080
1081 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1082                         struct messaging_rec **presult)
1083 {
1084         struct messaging_read_state *state = tevent_req_data(
1085                 req, struct messaging_read_state);
1086         int err;
1087
1088         if (tevent_req_is_unix_error(req, &err)) {
1089                 return err;
1090         }
1091         if (presult != NULL) {
1092                 *presult = talloc_move(mem_ctx, &state->rec);
1093         }
1094         return 0;
1095 }
1096
1097 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1098 {
1099         if (msg_ctx->num_new_waiters == 0) {
1100                 return true;
1101         }
1102
1103         if (talloc_array_length(msg_ctx->waiters) <
1104             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1105                 struct tevent_req **tmp;
1106                 tmp = talloc_realloc(
1107                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1108                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1109                 if (tmp == NULL) {
1110                         DEBUG(1, ("%s: talloc failed\n", __func__));
1111                         return false;
1112                 }
1113                 msg_ctx->waiters = tmp;
1114         }
1115
1116         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1117                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1118
1119         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1120         msg_ctx->num_new_waiters = 0;
1121
1122         return true;
1123 }
1124
1125 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1126                                        struct messaging_rec *rec)
1127 {
1128         struct messaging_callback *cb, *next;
1129
1130         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1131                 size_t j;
1132
1133                 next = cb->next;
1134                 if (cb->msg_type != rec->msg_type) {
1135                         continue;
1136                 }
1137
1138                 /*
1139                  * the old style callbacks don't support fd passing
1140                  */
1141                 for (j=0; j < rec->num_fds; j++) {
1142                         int fd = rec->fds[j];
1143                         close(fd);
1144                 }
1145                 rec->num_fds = 0;
1146                 rec->fds = NULL;
1147
1148                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1149                        rec->src, &rec->buf);
1150
1151                 return true;
1152         }
1153
1154         return false;
1155 }
1156
1157 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1158                                        struct tevent_context *ev,
1159                                        struct messaging_rec *rec)
1160 {
1161         size_t i;
1162
1163         if (!messaging_append_new_waiters(msg_ctx)) {
1164                 return false;
1165         }
1166
1167         i = 0;
1168         while (i < msg_ctx->num_waiters) {
1169                 struct tevent_req *req;
1170                 struct messaging_filtered_read_state *state;
1171
1172                 req = msg_ctx->waiters[i];
1173                 if (req == NULL) {
1174                         /*
1175                          * This got cleaned up. In the meantime,
1176                          * move everything down one. We need
1177                          * to keep the order of waiters, as
1178                          * other code may depend on this.
1179                          */
1180                         if (i < msg_ctx->num_waiters - 1) {
1181                                 memmove(&msg_ctx->waiters[i],
1182                                         &msg_ctx->waiters[i+1],
1183                                         sizeof(struct tevent_req *) *
1184                                             (msg_ctx->num_waiters - i - 1));
1185                         }
1186                         msg_ctx->num_waiters -= 1;
1187                         continue;
1188                 }
1189
1190                 state = tevent_req_data(
1191                         req, struct messaging_filtered_read_state);
1192                 if ((ev == state->ev) &&
1193                     state->filter(rec, state->private_data)) {
1194                         messaging_filtered_read_done(req, rec);
1195                         return true;
1196                 }
1197
1198                 i += 1;
1199         }
1200
1201         return false;
1202 }
1203
1204 /*
1205   Dispatch one messaging_rec
1206 */
1207 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1208                                    struct tevent_context *ev,
1209                                    struct messaging_rec *rec)
1210 {
1211         bool consumed;
1212         size_t i;
1213
1214         if (ev == msg_ctx->event_ctx) {
1215                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1216                 if (consumed) {
1217                         return;
1218                 }
1219         }
1220
1221         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1222         if (consumed) {
1223                 return;
1224         }
1225
1226         if (ev != msg_ctx->event_ctx) {
1227                 struct iovec iov;
1228                 int fds[rec->num_fds];
1229                 int ret;
1230
1231                 /*
1232                  * We've been listening on a nested event
1233                  * context. Messages need to be handled in the main
1234                  * event context, so post to ourselves
1235                  */
1236
1237                 iov.iov_base = rec->buf.data;
1238                 iov.iov_len = rec->buf.length;
1239
1240                 for (i=0; i<rec->num_fds; i++) {
1241                         fds[i] = rec->fds[i];
1242                 }
1243
1244                 ret = messaging_post_self(
1245                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1246                         &iov, 1, fds, rec->num_fds);
1247                 if (ret == 0) {
1248                         return;
1249                 }
1250         }
1251
1252         /*
1253          * If the fd-array isn't used, just close it.
1254          */
1255         for (i=0; i < rec->num_fds; i++) {
1256                 int fd = rec->fds[i];
1257                 close(fd);
1258         }
1259         rec->num_fds = 0;
1260         rec->fds = NULL;
1261 }
1262
1263 static int mess_parent_dgm_cleanup(void *private_data);
1264 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1265
1266 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1267 {
1268         struct tevent_req *req;
1269
1270         req = background_job_send(
1271                 msg, msg->event_ctx, msg, NULL, 0,
1272                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1273                             60*15),
1274                 mess_parent_dgm_cleanup, msg);
1275         if (req == NULL) {
1276                 return false;
1277         }
1278         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1279         return true;
1280 }
1281
1282 static int mess_parent_dgm_cleanup(void *private_data)
1283 {
1284         int ret;
1285
1286         ret = messaging_dgm_wipe();
1287         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1288                    ret ? strerror(ret) : "ok"));
1289         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1290                            60*15);
1291 }
1292
1293 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1294 {
1295         struct messaging_context *msg = tevent_req_callback_data(
1296                 req, struct messaging_context);
1297         NTSTATUS status;
1298
1299         status = background_job_recv(req);
1300         TALLOC_FREE(req);
1301         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1302                   nt_errstr(status)));
1303
1304         req = background_job_send(
1305                 msg, msg->event_ctx, msg, NULL, 0,
1306                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1307                             60*15),
1308                 mess_parent_dgm_cleanup, msg);
1309         if (req == NULL) {
1310                 DEBUG(1, ("background_job_send failed\n"));
1311                 return;
1312         }
1313         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1314 }
1315
1316 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1317 {
1318         int ret;
1319
1320         if (pid == 0) {
1321                 ret = messaging_dgm_wipe();
1322         } else {
1323                 ret = messaging_dgm_cleanup(pid);
1324         }
1325
1326         return ret;
1327 }
1328
1329 struct tevent_context *messaging_tevent_context(
1330         struct messaging_context *msg_ctx)
1331 {
1332         return msg_ctx->event_ctx;
1333 }
1334
1335 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1336 {
1337         return msg_ctx->names_db;
1338 }
1339
1340 /** @} **/