45e210ff75eeaa268d0563a37e177cae627633b9
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
65
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
69
70 struct messaging_callback {
71         struct messaging_callback *prev, *next;
72         uint32_t msg_type;
73         void (*fn)(struct messaging_context *msg, void *private_data, 
74                    uint32_t msg_type, 
75                    struct server_id server_id, DATA_BLOB *data);
76         void *private_data;
77 };
78
79 struct messaging_registered_ev {
80         struct tevent_context *ev;
81         struct tevent_immediate *im;
82         size_t refcount;
83 };
84
85 struct messaging_context {
86         struct server_id id;
87         struct tevent_context *event_ctx;
88         struct messaging_callback *callbacks;
89
90         struct messaging_rec *posted_msgs;
91
92         struct messaging_registered_ev *event_contexts;
93
94         struct tevent_req **new_waiters;
95         size_t num_new_waiters;
96
97         struct tevent_req **waiters;
98         size_t num_waiters;
99
100         void *msg_dgm_ref;
101         void *msg_ctdb_ref;
102
103         struct server_id_db *names_db;
104 };
105
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107                                                struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109                                        struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111                                        struct tevent_context *ev,
112                                        struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114                                    struct tevent_context *ev,
115                                    struct messaging_rec *rec);
116
117 /****************************************************************************
118  A useful function for testing the message system.
119 ****************************************************************************/
120
121 static void ping_message(struct messaging_context *msg_ctx,
122                          void *private_data,
123                          uint32_t msg_type,
124                          struct server_id src,
125                          DATA_BLOB *data)
126 {
127         struct server_id_buf idbuf;
128
129         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130                   server_id_str_buf(src, &idbuf), (int)data->length,
131                   data->data ? (char *)data->data : ""));
132
133         messaging_send(msg_ctx, src, MSG_PONG, data);
134 }
135
136 struct messaging_rec *messaging_rec_create(
137         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138         uint32_t msg_type, const struct iovec *iov, int iovlen,
139         const int *fds, size_t num_fds)
140 {
141         ssize_t buflen;
142         uint8_t *buf;
143         struct messaging_rec *result;
144
145         if (num_fds > INT8_MAX) {
146                 return NULL;
147         }
148
149         buflen = iov_buflen(iov, iovlen);
150         if (buflen == -1) {
151                 return NULL;
152         }
153         buf = talloc_array(mem_ctx, uint8_t, buflen);
154         if (buf == NULL) {
155                 return NULL;
156         }
157         iov_buf(iov, iovlen, buf, buflen);
158
159         {
160                 struct messaging_rec rec;
161                 int64_t fds64[num_fds];
162                 size_t i;
163
164                 for (i=0; i<num_fds; i++) {
165                         fds64[i] = fds[i];
166                 }
167
168                 rec = (struct messaging_rec) {
169                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170                         .src = src, .dest = dst,
171                         .buf.data = buf, .buf.length = buflen,
172                         .num_fds = num_fds, .fds = fds64,
173                 };
174
175                 result = messaging_rec_dup(mem_ctx, &rec);
176         }
177
178         TALLOC_FREE(buf);
179
180         return result;
181 }
182
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184                                              struct tevent_context *ev)
185 {
186         size_t i, num_event_contexts;
187         struct messaging_registered_ev *free_reg = NULL;
188         struct messaging_registered_ev *tmp;
189
190         num_event_contexts = talloc_array_length(ctx->event_contexts);
191
192         for (i=0; i<num_event_contexts; i++) {
193                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
194
195                 if (reg->refcount == 0) {
196                         if (reg->ev != NULL) {
197                                 abort();
198                         }
199                         free_reg = reg;
200                         /*
201                          * We continue here and may find another
202                          * free_req, but the important thing is
203                          * that we continue to search for an
204                          * existing registration in the loop.
205                          */
206                         continue;
207                 }
208
209                 if (reg->ev == ev) {
210                         reg->refcount += 1;
211                         return true;
212                 }
213         }
214
215         if (free_reg == NULL) {
216                 tmp = talloc_realloc(ctx, ctx->event_contexts,
217                                      struct messaging_registered_ev,
218                                      num_event_contexts+1);
219                 if (tmp == NULL) {
220                         return false;
221                 }
222                 ctx->event_contexts = tmp;
223
224                 free_reg = &ctx->event_contexts[num_event_contexts];
225         }
226
227         *free_reg = (struct messaging_registered_ev) { .ev = ev, .refcount = 1 };
228
229         return true;
230 }
231
232 static bool messaging_deregister_event_context(struct messaging_context *ctx,
233                                                struct tevent_context *ev)
234 {
235         size_t i, num_event_contexts;
236
237         num_event_contexts = talloc_array_length(ctx->event_contexts);
238
239         for (i=0; i<num_event_contexts; i++) {
240                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
241
242                 if (reg->refcount == 0) {
243                         continue;
244                 }
245
246                 if (reg->ev == ev) {
247                         reg->refcount -= 1;
248
249                         if (reg->refcount == 0) {
250                                 /*
251                                  * Not strictly necessary, just
252                                  * paranoia
253                                  */
254                                 reg->ev = NULL;
255
256                                 /*
257                                  * Do not talloc_free(reg->im),
258                                  * recycle immediates events.
259                                  */
260                         }
261                         return true;
262                 }
263         }
264         return false;
265 }
266
267 static void messaging_post_main_event_context(struct tevent_context *ev,
268                                               struct tevent_immediate *im,
269                                               void *private_data)
270 {
271         struct messaging_context *ctx = talloc_get_type_abort(
272                 private_data, struct messaging_context);
273
274         while (ctx->posted_msgs != NULL) {
275                 struct messaging_rec *rec = ctx->posted_msgs;
276                 bool consumed;
277
278                 DLIST_REMOVE(ctx->posted_msgs, rec);
279
280                 consumed = messaging_dispatch_classic(ctx, rec);
281                 if (!consumed) {
282                         consumed = messaging_dispatch_waiters(
283                                 ctx, ctx->event_ctx, rec);
284                 }
285
286                 if (!consumed) {
287                         uint8_t i;
288
289                         for (i=0; i<rec->num_fds; i++) {
290                                 close(rec->fds[i]);
291                         }
292                 }
293
294                 TALLOC_FREE(rec);
295         }
296 }
297
298 static void messaging_post_sub_event_context(struct tevent_context *ev,
299                                              struct tevent_immediate *im,
300                                              void *private_data)
301 {
302         struct messaging_context *ctx = talloc_get_type_abort(
303                 private_data, struct messaging_context);
304         struct messaging_rec *rec, *next;
305
306         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
307                 bool consumed;
308
309                 next = rec->next;
310
311                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
312                 if (consumed) {
313                         DLIST_REMOVE(ctx->posted_msgs, rec);
314                         TALLOC_FREE(rec);
315                 }
316         }
317 }
318
319 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
320 {
321         size_t i, num_event_contexts;
322
323         num_event_contexts = talloc_array_length(ctx->event_contexts);
324
325         for (i=0; i<num_event_contexts; i++) {
326                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
327
328                 if (reg->refcount == 0) {
329                         continue;
330                 }
331
332                 if (reg->im == NULL) {
333                         reg->im = tevent_create_immediate(
334                                 ctx->event_contexts);
335                 }
336                 if (reg->im == NULL) {
337                         DBG_WARNING("Could not create immediate\n");
338                         continue;
339                 }
340
341                 /*
342                  * We depend on schedule_immediate to work
343                  * multiple times. Might be a bit inefficient,
344                  * but this needs to be proven in tests. The
345                  * alternatively would be to track whether the
346                  * immediate has already been scheduled. For
347                  * now, avoid that complexity here.
348                  */
349
350                 if (reg->ev == ctx->event_ctx) {
351                         tevent_schedule_immediate(
352                                 reg->im, reg->ev,
353                                 messaging_post_main_event_context,
354                                 ctx);
355                 } else {
356                         tevent_schedule_immediate(
357                                 reg->im, reg->ev,
358                                 messaging_post_sub_event_context,
359                                 ctx);
360                 }
361
362         }
363         return true;
364 }
365
366 static void messaging_recv_cb(struct tevent_context *ev,
367                               const uint8_t *msg, size_t msg_len,
368                               int *fds, size_t num_fds,
369                               void *private_data)
370 {
371         struct messaging_context *msg_ctx = talloc_get_type_abort(
372                 private_data, struct messaging_context);
373         struct server_id_buf idbuf;
374         struct messaging_rec rec;
375         int64_t fds64[MIN(num_fds, INT8_MAX)];
376         size_t i;
377
378         if (msg_len < MESSAGE_HDR_LENGTH) {
379                 DBG_WARNING("message too short: %zu\n", msg_len);
380                 goto close_fail;
381         }
382
383         if (num_fds > INT8_MAX) {
384                 DBG_WARNING("too many fds: %zu\n", num_fds);
385                 goto close_fail;
386         }
387
388         /*
389          * "consume" the fds by copying them and setting
390          * the original variable to -1
391          */
392         for (i=0; i < num_fds; i++) {
393                 fds64[i] = fds[i];
394                 fds[i] = -1;
395         }
396
397         rec = (struct messaging_rec) {
398                 .msg_version = MESSAGE_VERSION,
399                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
400                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
401                 .num_fds = num_fds,
402                 .fds = fds64,
403         };
404
405         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
406
407         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
408                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
409                   server_id_str_buf(rec.src, &idbuf));
410
411         if (server_id_same_process(&rec.src, &msg_ctx->id)) {
412                 DBG_DEBUG("Ignoring self-send\n");
413                 goto close_fail;
414         }
415
416         messaging_dispatch_rec(msg_ctx, ev, &rec);
417         return;
418
419 close_fail:
420         for (i=0; i < num_fds; i++) {
421                 close(fds[i]);
422         }
423 }
424
425 static int messaging_context_destructor(struct messaging_context *ctx)
426 {
427         size_t i;
428
429         for (i=0; i<ctx->num_new_waiters; i++) {
430                 if (ctx->new_waiters[i] != NULL) {
431                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
432                         ctx->new_waiters[i] = NULL;
433                 }
434         }
435         for (i=0; i<ctx->num_waiters; i++) {
436                 if (ctx->waiters[i] != NULL) {
437                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
438                         ctx->waiters[i] = NULL;
439                 }
440         }
441
442         /*
443          * The immediates from messaging_alert_event_contexts
444          * reference "ctx". Don't let them outlive the
445          * messaging_context we're destroying here.
446          */
447         TALLOC_FREE(ctx->event_contexts);
448
449         return 0;
450 }
451
452 static const char *private_path(const char *name)
453 {
454         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
455 }
456
457 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
458                                         struct tevent_context *ev,
459                                         struct messaging_context **pmsg_ctx)
460 {
461         TALLOC_CTX *frame;
462         struct messaging_context *ctx;
463         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
464         int ret;
465         const char *lck_path;
466         const char *priv_path;
467         bool ok;
468
469         /*
470          * sec_init() *must* be called before any other
471          * functions that use sec_XXX(). e.g. sec_initial_uid().
472          */
473
474         sec_init();
475
476         lck_path = lock_path("msg.lock");
477         if (lck_path == NULL) {
478                 return NT_STATUS_NO_MEMORY;
479         }
480
481         ok = directory_create_or_exist_strict(lck_path,
482                                               sec_initial_uid(),
483                                               0755);
484         if (!ok) {
485                 DBG_DEBUG("Could not create lock directory: %s\n",
486                           strerror(errno));
487                 return NT_STATUS_ACCESS_DENIED;
488         }
489
490         priv_path = private_path("msg.sock");
491         if (priv_path == NULL) {
492                 return NT_STATUS_NO_MEMORY;
493         }
494
495         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
496                                               0700);
497         if (!ok) {
498                 DBG_DEBUG("Could not create msg directory: %s\n",
499                           strerror(errno));
500                 return NT_STATUS_ACCESS_DENIED;
501         }
502
503         frame = talloc_stackframe();
504         if (frame == NULL) {
505                 return NT_STATUS_NO_MEMORY;
506         }
507
508         ctx = talloc_zero(frame, struct messaging_context);
509         if (ctx == NULL) {
510                 status = NT_STATUS_NO_MEMORY;
511                 goto done;
512         }
513
514         ctx->id = (struct server_id) {
515                 .pid = getpid(), .vnn = NONCLUSTER_VNN
516         };
517
518         ctx->event_ctx = ev;
519
520         ok = messaging_register_event_context(ctx, ev);
521         if (!ok) {
522                 status = NT_STATUS_NO_MEMORY;
523                 goto done;
524         }
525
526         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
527                                              ctx->event_ctx,
528                                              &ctx->id.unique_id,
529                                              priv_path,
530                                              lck_path,
531                                              messaging_recv_cb,
532                                              ctx,
533                                              &ret);
534         if (ctx->msg_dgm_ref == NULL) {
535                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
536                 status = map_nt_error_from_unix(ret);
537                 goto done;
538         }
539         talloc_set_destructor(ctx, messaging_context_destructor);
540
541 #ifdef CLUSTER_SUPPORT
542         if (lp_clustering()) {
543                 ctx->msg_ctdb_ref = messaging_ctdb_ref(
544                         ctx, ctx->event_ctx,
545                         lp_ctdbd_socket(), lp_ctdb_timeout(),
546                         ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
547                 if (ctx->msg_ctdb_ref == NULL) {
548                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
549                                    strerror(ret));
550                         status = map_nt_error_from_unix(ret);
551                         goto done;
552                 }
553         }
554 #endif
555
556         ctx->id.vnn = get_my_vnn();
557
558         ctx->names_db = server_id_db_init(ctx,
559                                           ctx->id,
560                                           lp_lock_directory(),
561                                           0,
562                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
563         if (ctx->names_db == NULL) {
564                 DBG_DEBUG("server_id_db_init failed\n");
565                 status = NT_STATUS_NO_MEMORY;
566                 goto done;
567         }
568
569         messaging_register(ctx, NULL, MSG_PING, ping_message);
570
571         /* Register some debugging related messages */
572
573         register_msg_pool_usage(ctx);
574         register_dmalloc_msgs(ctx);
575         debug_register_msgs(ctx);
576
577         {
578                 struct server_id_buf tmp;
579                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
580         }
581
582         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
583
584         status = NT_STATUS_OK;
585 done:
586         TALLOC_FREE(frame);
587
588         return status;
589 }
590
591 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
592                                          struct tevent_context *ev)
593 {
594         struct messaging_context *ctx = NULL;
595         NTSTATUS status;
596
597         status = messaging_init_internal(mem_ctx,
598                                          ev,
599                                          &ctx);
600         if (!NT_STATUS_IS_OK(status)) {
601                 return NULL;
602         }
603
604         return ctx;
605 }
606
607 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
608                                struct tevent_context *ev,
609                                struct messaging_context **pmsg_ctx)
610 {
611         return messaging_init_internal(mem_ctx,
612                                         ev,
613                                         pmsg_ctx);
614 }
615
616 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
617 {
618         return msg_ctx->id;
619 }
620
621 /*
622  * re-init after a fork
623  */
624 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
625 {
626         int ret;
627         char *lck_path;
628
629         TALLOC_FREE(msg_ctx->msg_dgm_ref);
630         TALLOC_FREE(msg_ctx->msg_ctdb_ref);
631
632         msg_ctx->id = (struct server_id) {
633                 .pid = getpid(), .vnn = msg_ctx->id.vnn
634         };
635
636         lck_path = lock_path("msg.lock");
637         if (lck_path == NULL) {
638                 return NT_STATUS_NO_MEMORY;
639         }
640
641         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
642                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
643                 private_path("msg.sock"), lck_path,
644                 messaging_recv_cb, msg_ctx, &ret);
645
646         if (msg_ctx->msg_dgm_ref == NULL) {
647                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
648                 return map_nt_error_from_unix(ret);
649         }
650
651         if (lp_clustering()) {
652                 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
653                         msg_ctx, msg_ctx->event_ctx,
654                         lp_ctdbd_socket(), lp_ctdb_timeout(),
655                         msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
656                         &ret);
657                 if (msg_ctx->msg_ctdb_ref == NULL) {
658                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
659                                    strerror(ret));
660                         return map_nt_error_from_unix(ret);
661                 }
662         }
663
664         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
665
666         return NT_STATUS_OK;
667 }
668
669
670 /*
671  * Register a dispatch function for a particular message type. Allow multiple
672  * registrants
673 */
674 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
675                             void *private_data,
676                             uint32_t msg_type,
677                             void (*fn)(struct messaging_context *msg,
678                                        void *private_data, 
679                                        uint32_t msg_type, 
680                                        struct server_id server_id,
681                                        DATA_BLOB *data))
682 {
683         struct messaging_callback *cb;
684
685         DEBUG(5, ("Registering messaging pointer for type %u - "
686                   "private_data=%p\n",
687                   (unsigned)msg_type, private_data));
688
689         /*
690          * Only one callback per type
691          */
692
693         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
694                 /* we allow a second registration of the same message
695                    type if it has a different private pointer. This is
696                    needed in, for example, the internal notify code,
697                    which creates a new notify context for each tree
698                    connect, and expects to receive messages to each of
699                    them. */
700                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
701                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
702                                   (unsigned)msg_type, private_data));
703                         cb->fn = fn;
704                         cb->private_data = private_data;
705                         return NT_STATUS_OK;
706                 }
707         }
708
709         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
710                 return NT_STATUS_NO_MEMORY;
711         }
712
713         cb->msg_type = msg_type;
714         cb->fn = fn;
715         cb->private_data = private_data;
716
717         DLIST_ADD(msg_ctx->callbacks, cb);
718         return NT_STATUS_OK;
719 }
720
721 /*
722   De-register the function for a particular message type.
723 */
724 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
725                           void *private_data)
726 {
727         struct messaging_callback *cb, *next;
728
729         for (cb = ctx->callbacks; cb; cb = next) {
730                 next = cb->next;
731                 if ((cb->msg_type == msg_type)
732                     && (cb->private_data == private_data)) {
733                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
734                                   (unsigned)msg_type, private_data));
735                         DLIST_REMOVE(ctx->callbacks, cb);
736                         TALLOC_FREE(cb);
737                 }
738         }
739 }
740
741 /*
742   Send a message to a particular server
743 */
744 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
745                         struct server_id server, uint32_t msg_type,
746                         const DATA_BLOB *data)
747 {
748         struct iovec iov = {0};
749
750         if (data != NULL) {
751                 iov.iov_base = data->data;
752                 iov.iov_len = data->length;
753         };
754
755         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
756 }
757
758 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
759                             struct server_id server, uint32_t msg_type,
760                             const uint8_t *buf, size_t len)
761 {
762         DATA_BLOB blob = data_blob_const(buf, len);
763         return messaging_send(msg_ctx, server, msg_type, &blob);
764 }
765
766 static int messaging_post_self(struct messaging_context *msg_ctx,
767                                struct server_id src, struct server_id dst,
768                                uint32_t msg_type,
769                                const struct iovec *iov, int iovlen,
770                                const int *fds, size_t num_fds)
771 {
772         struct messaging_rec *rec;
773         bool ok;
774
775         rec = messaging_rec_create(
776                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
777         if (rec == NULL) {
778                 return ENOMEM;
779         }
780
781         ok = messaging_alert_event_contexts(msg_ctx);
782         if (!ok) {
783                 TALLOC_FREE(rec);
784                 return ENOMEM;
785         }
786
787         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
788
789         return 0;
790 }
791
792 int messaging_send_iov_from(struct messaging_context *msg_ctx,
793                             struct server_id src, struct server_id dst,
794                             uint32_t msg_type,
795                             const struct iovec *iov, int iovlen,
796                             const int *fds, size_t num_fds)
797 {
798         int ret;
799         uint8_t hdr[MESSAGE_HDR_LENGTH];
800         struct iovec iov2[iovlen+1];
801
802         if (server_id_is_disconnected(&dst)) {
803                 return EINVAL;
804         }
805
806         if (num_fds > INT8_MAX) {
807                 return EINVAL;
808         }
809
810         if (server_id_equal(&dst, &msg_ctx->id)) {
811                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
812                                           iov, iovlen, fds, num_fds);
813                 return ret;
814         }
815
816         message_hdr_put(hdr, msg_type, src, dst);
817         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
818         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
819
820         if (dst.vnn != msg_ctx->id.vnn) {
821                 if (num_fds > 0) {
822                         return ENOSYS;
823                 }
824
825                 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
826                 return ret;
827         }
828
829         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
830
831         if (ret == EACCES) {
832                 become_root();
833                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
834                                          fds, num_fds);
835                 unbecome_root();
836         }
837
838         if (ret == ECONNREFUSED) {
839                 /*
840                  * Linux returns this when a socket exists in the file
841                  * system without a listening process. This is not
842                  * documented in susv4 or the linux manpages, but it's
843                  * easily testable. For the higher levels this is the
844                  * same as "destination does not exist"
845                  */
846                 ret = ENOENT;
847         }
848
849         return ret;
850 }
851
852 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
853                             struct server_id dst, uint32_t msg_type,
854                             const struct iovec *iov, int iovlen,
855                             const int *fds, size_t num_fds)
856 {
857         int ret;
858
859         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
860                                       iov, iovlen, fds, num_fds);
861         if (ret != 0) {
862                 return map_nt_error_from_unix(ret);
863         }
864         return NT_STATUS_OK;
865 }
866
867 struct send_all_state {
868         struct messaging_context *msg_ctx;
869         int msg_type;
870         const void *buf;
871         size_t len;
872 };
873
874 static int send_all_fn(pid_t pid, void *private_data)
875 {
876         struct send_all_state *state = private_data;
877         NTSTATUS status;
878
879         if (pid == getpid()) {
880                 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
881                 return 0;
882         }
883
884         status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
885                                     state->msg_type, state->buf, state->len);
886         if (!NT_STATUS_IS_OK(status)) {
887                 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
888                             (uintmax_t)pid, nt_errstr(status));
889         }
890
891         return 0;
892 }
893
894 void messaging_send_all(struct messaging_context *msg_ctx,
895                         int msg_type, const void *buf, size_t len)
896 {
897         struct send_all_state state = {
898                 .msg_ctx = msg_ctx, .msg_type = msg_type,
899                 .buf = buf, .len = len
900         };
901         int ret;
902
903 #ifdef CLUSTER_SUPPORT
904         if (lp_clustering()) {
905                 struct ctdbd_connection *conn = messaging_ctdb_connection();
906                 uint8_t msghdr[MESSAGE_HDR_LENGTH];
907                 struct iovec iov[] = {
908                         { .iov_base = msghdr,
909                           .iov_len = sizeof(msghdr) },
910                         { .iov_base = discard_const_p(void, buf),
911                           .iov_len = len }
912                 };
913
914                 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
915                                 (struct server_id) {0});
916
917                 ret = ctdbd_messaging_send_iov(
918                         conn, CTDB_BROADCAST_CONNECTED,
919                         CTDB_SRVID_SAMBA_PROCESS,
920                         iov, ARRAY_SIZE(iov));
921                 if (ret != 0) {
922                         DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
923                                     strerror(ret));
924                 }
925
926                 return;
927         }
928 #endif
929
930         ret = messaging_dgm_forall(send_all_fn, &state);
931         if (ret != 0) {
932                 DBG_WARNING("messaging_dgm_forall failed: %s\n",
933                             strerror(ret));
934         }
935 }
936
937 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
938                                                struct messaging_rec *rec)
939 {
940         struct messaging_rec *result;
941         size_t fds_size = sizeof(int64_t) * rec->num_fds;
942         size_t payload_len;
943
944         payload_len = rec->buf.length + fds_size;
945         if (payload_len < rec->buf.length) {
946                 /* overflow */
947                 return NULL;
948         }
949
950         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
951                                       payload_len);
952         if (result == NULL) {
953                 return NULL;
954         }
955         *result = *rec;
956
957         /* Doesn't fail, see talloc_pooled_object */
958
959         result->buf.data = talloc_memdup(result, rec->buf.data,
960                                          rec->buf.length);
961
962         result->fds = NULL;
963         if (result->num_fds > 0) {
964                 result->fds = talloc_memdup(result, rec->fds, fds_size);
965         }
966
967         return result;
968 }
969
970 struct messaging_filtered_read_state {
971         struct tevent_context *ev;
972         struct messaging_context *msg_ctx;
973         struct messaging_dgm_fde *fde;
974         struct messaging_ctdb_fde *cluster_fde;
975
976         bool (*filter)(struct messaging_rec *rec, void *private_data);
977         void *private_data;
978
979         struct messaging_rec *rec;
980 };
981
982 static void messaging_filtered_read_cleanup(struct tevent_req *req,
983                                             enum tevent_req_state req_state);
984
985 struct tevent_req *messaging_filtered_read_send(
986         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
987         struct messaging_context *msg_ctx,
988         bool (*filter)(struct messaging_rec *rec, void *private_data),
989         void *private_data)
990 {
991         struct tevent_req *req;
992         struct messaging_filtered_read_state *state;
993         size_t new_waiters_len;
994         bool ok;
995
996         req = tevent_req_create(mem_ctx, &state,
997                                 struct messaging_filtered_read_state);
998         if (req == NULL) {
999                 return NULL;
1000         }
1001         state->ev = ev;
1002         state->msg_ctx = msg_ctx;
1003         state->filter = filter;
1004         state->private_data = private_data;
1005
1006         /*
1007          * We have to defer the callback here, as we might be called from
1008          * within a different tevent_context than state->ev
1009          */
1010         tevent_req_defer_callback(req, state->ev);
1011
1012         state->fde = messaging_dgm_register_tevent_context(state, ev);
1013         if (tevent_req_nomem(state->fde, req)) {
1014                 return tevent_req_post(req, ev);
1015         }
1016
1017         if (lp_clustering()) {
1018                 state->cluster_fde =
1019                         messaging_ctdb_register_tevent_context(state, ev);
1020                 if (tevent_req_nomem(state->cluster_fde, req)) {
1021                         return tevent_req_post(req, ev);
1022                 }
1023         }
1024
1025         /*
1026          * We add ourselves to the "new_waiters" array, not the "waiters"
1027          * array. If we are called from within messaging_read_done,
1028          * messaging_dispatch_rec will be in an active for-loop on
1029          * "waiters". We must be careful not to mess with this array, because
1030          * it could mean that a single event is being delivered twice.
1031          */
1032
1033         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1034
1035         if (new_waiters_len == msg_ctx->num_new_waiters) {
1036                 struct tevent_req **tmp;
1037
1038                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1039                                      struct tevent_req *, new_waiters_len+1);
1040                 if (tevent_req_nomem(tmp, req)) {
1041                         return tevent_req_post(req, ev);
1042                 }
1043                 msg_ctx->new_waiters = tmp;
1044         }
1045
1046         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1047         msg_ctx->num_new_waiters += 1;
1048         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1049
1050         ok = messaging_register_event_context(msg_ctx, ev);
1051         if (!ok) {
1052                 tevent_req_oom(req);
1053                 return tevent_req_post(req, ev);
1054         }
1055
1056         return req;
1057 }
1058
1059 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1060                                             enum tevent_req_state req_state)
1061 {
1062         struct messaging_filtered_read_state *state = tevent_req_data(
1063                 req, struct messaging_filtered_read_state);
1064         struct messaging_context *msg_ctx = state->msg_ctx;
1065         size_t i;
1066         bool ok;
1067
1068         tevent_req_set_cleanup_fn(req, NULL);
1069
1070         TALLOC_FREE(state->fde);
1071         TALLOC_FREE(state->cluster_fde);
1072
1073         ok = messaging_deregister_event_context(msg_ctx, state->ev);
1074         if (!ok) {
1075                 abort();
1076         }
1077
1078         /*
1079          * Just set the [new_]waiters entry to NULL, be careful not to mess
1080          * with the other "waiters" array contents. We are often called from
1081          * within "messaging_dispatch_rec", which loops over
1082          * "waiters". Messing with the "waiters" array will mess up that
1083          * for-loop.
1084          */
1085
1086         for (i=0; i<msg_ctx->num_waiters; i++) {
1087                 if (msg_ctx->waiters[i] == req) {
1088                         msg_ctx->waiters[i] = NULL;
1089                         return;
1090                 }
1091         }
1092
1093         for (i=0; i<msg_ctx->num_new_waiters; i++) {
1094                 if (msg_ctx->new_waiters[i] == req) {
1095                         msg_ctx->new_waiters[i] = NULL;
1096                         return;
1097                 }
1098         }
1099 }
1100
1101 static void messaging_filtered_read_done(struct tevent_req *req,
1102                                          struct messaging_rec *rec)
1103 {
1104         struct messaging_filtered_read_state *state = tevent_req_data(
1105                 req, struct messaging_filtered_read_state);
1106
1107         state->rec = messaging_rec_dup(state, rec);
1108         if (tevent_req_nomem(state->rec, req)) {
1109                 return;
1110         }
1111         tevent_req_done(req);
1112 }
1113
1114 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1115                                  struct messaging_rec **presult)
1116 {
1117         struct messaging_filtered_read_state *state = tevent_req_data(
1118                 req, struct messaging_filtered_read_state);
1119         int err;
1120
1121         if (tevent_req_is_unix_error(req, &err)) {
1122                 tevent_req_received(req);
1123                 return err;
1124         }
1125         if (presult != NULL) {
1126                 *presult = talloc_move(mem_ctx, &state->rec);
1127         }
1128         return 0;
1129 }
1130
1131 struct messaging_read_state {
1132         uint32_t msg_type;
1133         struct messaging_rec *rec;
1134 };
1135
1136 static bool messaging_read_filter(struct messaging_rec *rec,
1137                                   void *private_data);
1138 static void messaging_read_done(struct tevent_req *subreq);
1139
1140 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1141                                        struct tevent_context *ev,
1142                                        struct messaging_context *msg,
1143                                        uint32_t msg_type)
1144 {
1145         struct tevent_req *req, *subreq;
1146         struct messaging_read_state *state;
1147
1148         req = tevent_req_create(mem_ctx, &state,
1149                                 struct messaging_read_state);
1150         if (req == NULL) {
1151                 return NULL;
1152         }
1153         state->msg_type = msg_type;
1154
1155         subreq = messaging_filtered_read_send(state, ev, msg,
1156                                               messaging_read_filter, state);
1157         if (tevent_req_nomem(subreq, req)) {
1158                 return tevent_req_post(req, ev);
1159         }
1160         tevent_req_set_callback(subreq, messaging_read_done, req);
1161         return req;
1162 }
1163
1164 static bool messaging_read_filter(struct messaging_rec *rec,
1165                                   void *private_data)
1166 {
1167         struct messaging_read_state *state = talloc_get_type_abort(
1168                 private_data, struct messaging_read_state);
1169
1170         if (rec->num_fds != 0) {
1171                 return false;
1172         }
1173
1174         return rec->msg_type == state->msg_type;
1175 }
1176
1177 static void messaging_read_done(struct tevent_req *subreq)
1178 {
1179         struct tevent_req *req = tevent_req_callback_data(
1180                 subreq, struct tevent_req);
1181         struct messaging_read_state *state = tevent_req_data(
1182                 req, struct messaging_read_state);
1183         int ret;
1184
1185         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1186         TALLOC_FREE(subreq);
1187         if (tevent_req_error(req, ret)) {
1188                 return;
1189         }
1190         tevent_req_done(req);
1191 }
1192
1193 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1194                         struct messaging_rec **presult)
1195 {
1196         struct messaging_read_state *state = tevent_req_data(
1197                 req, struct messaging_read_state);
1198         int err;
1199
1200         if (tevent_req_is_unix_error(req, &err)) {
1201                 return err;
1202         }
1203         if (presult != NULL) {
1204                 *presult = talloc_move(mem_ctx, &state->rec);
1205         }
1206         return 0;
1207 }
1208
1209 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1210 {
1211         if (msg_ctx->num_new_waiters == 0) {
1212                 return true;
1213         }
1214
1215         if (talloc_array_length(msg_ctx->waiters) <
1216             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1217                 struct tevent_req **tmp;
1218                 tmp = talloc_realloc(
1219                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1220                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1221                 if (tmp == NULL) {
1222                         DEBUG(1, ("%s: talloc failed\n", __func__));
1223                         return false;
1224                 }
1225                 msg_ctx->waiters = tmp;
1226         }
1227
1228         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1229                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1230
1231         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1232         msg_ctx->num_new_waiters = 0;
1233
1234         return true;
1235 }
1236
1237 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1238                                        struct messaging_rec *rec)
1239 {
1240         struct messaging_callback *cb, *next;
1241
1242         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1243                 size_t j;
1244
1245                 next = cb->next;
1246                 if (cb->msg_type != rec->msg_type) {
1247                         continue;
1248                 }
1249
1250                 /*
1251                  * the old style callbacks don't support fd passing
1252                  */
1253                 for (j=0; j < rec->num_fds; j++) {
1254                         int fd = rec->fds[j];
1255                         close(fd);
1256                 }
1257                 rec->num_fds = 0;
1258                 rec->fds = NULL;
1259
1260                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1261                        rec->src, &rec->buf);
1262
1263                 return true;
1264         }
1265
1266         return false;
1267 }
1268
1269 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1270                                        struct tevent_context *ev,
1271                                        struct messaging_rec *rec)
1272 {
1273         size_t i;
1274
1275         if (!messaging_append_new_waiters(msg_ctx)) {
1276                 return false;
1277         }
1278
1279         i = 0;
1280         while (i < msg_ctx->num_waiters) {
1281                 struct tevent_req *req;
1282                 struct messaging_filtered_read_state *state;
1283
1284                 req = msg_ctx->waiters[i];
1285                 if (req == NULL) {
1286                         /*
1287                          * This got cleaned up. In the meantime,
1288                          * move everything down one. We need
1289                          * to keep the order of waiters, as
1290                          * other code may depend on this.
1291                          */
1292                         if (i < msg_ctx->num_waiters - 1) {
1293                                 memmove(&msg_ctx->waiters[i],
1294                                         &msg_ctx->waiters[i+1],
1295                                         sizeof(struct tevent_req *) *
1296                                             (msg_ctx->num_waiters - i - 1));
1297                         }
1298                         msg_ctx->num_waiters -= 1;
1299                         continue;
1300                 }
1301
1302                 state = tevent_req_data(
1303                         req, struct messaging_filtered_read_state);
1304                 if ((ev == state->ev) &&
1305                     state->filter(rec, state->private_data)) {
1306                         messaging_filtered_read_done(req, rec);
1307                         return true;
1308                 }
1309
1310                 i += 1;
1311         }
1312
1313         return false;
1314 }
1315
1316 /*
1317   Dispatch one messaging_rec
1318 */
1319 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1320                                    struct tevent_context *ev,
1321                                    struct messaging_rec *rec)
1322 {
1323         bool consumed;
1324         size_t i;
1325
1326         if (ev == msg_ctx->event_ctx) {
1327                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1328                 if (consumed) {
1329                         return;
1330                 }
1331         }
1332
1333         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1334         if (consumed) {
1335                 return;
1336         }
1337
1338         if (ev != msg_ctx->event_ctx) {
1339                 struct iovec iov;
1340                 int fds[rec->num_fds];
1341                 int ret;
1342
1343                 /*
1344                  * We've been listening on a nested event
1345                  * context. Messages need to be handled in the main
1346                  * event context, so post to ourselves
1347                  */
1348
1349                 iov.iov_base = rec->buf.data;
1350                 iov.iov_len = rec->buf.length;
1351
1352                 for (i=0; i<rec->num_fds; i++) {
1353                         fds[i] = rec->fds[i];
1354                 }
1355
1356                 ret = messaging_post_self(
1357                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1358                         &iov, 1, fds, rec->num_fds);
1359                 if (ret == 0) {
1360                         return;
1361                 }
1362         }
1363
1364         /*
1365          * If the fd-array isn't used, just close it.
1366          */
1367         for (i=0; i < rec->num_fds; i++) {
1368                 int fd = rec->fds[i];
1369                 close(fd);
1370         }
1371         rec->num_fds = 0;
1372         rec->fds = NULL;
1373 }
1374
1375 static int mess_parent_dgm_cleanup(void *private_data);
1376 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1377
1378 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1379 {
1380         struct tevent_req *req;
1381
1382         req = background_job_send(
1383                 msg, msg->event_ctx, msg, NULL, 0,
1384                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1385                             60*15),
1386                 mess_parent_dgm_cleanup, msg);
1387         if (req == NULL) {
1388                 DBG_WARNING("background_job_send failed\n");
1389                 return false;
1390         }
1391         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1392         return true;
1393 }
1394
1395 static int mess_parent_dgm_cleanup(void *private_data)
1396 {
1397         int ret;
1398
1399         ret = messaging_dgm_wipe();
1400         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1401                    ret ? strerror(ret) : "ok"));
1402         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1403                            60*15);
1404 }
1405
1406 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1407 {
1408         struct messaging_context *msg = tevent_req_callback_data(
1409                 req, struct messaging_context);
1410         NTSTATUS status;
1411
1412         status = background_job_recv(req);
1413         TALLOC_FREE(req);
1414         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1415                   nt_errstr(status)));
1416
1417         req = background_job_send(
1418                 msg, msg->event_ctx, msg, NULL, 0,
1419                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1420                             60*15),
1421                 mess_parent_dgm_cleanup, msg);
1422         if (req == NULL) {
1423                 DEBUG(1, ("background_job_send failed\n"));
1424                 return;
1425         }
1426         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1427 }
1428
1429 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1430 {
1431         int ret;
1432
1433         if (pid == 0) {
1434                 ret = messaging_dgm_wipe();
1435         } else {
1436                 ret = messaging_dgm_cleanup(pid);
1437         }
1438
1439         return ret;
1440 }
1441
1442 struct tevent_context *messaging_tevent_context(
1443         struct messaging_context *msg_ctx)
1444 {
1445         return msg_ctx->event_ctx;
1446 }
1447
1448 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1449 {
1450         return msg_ctx->names_db;
1451 }
1452
1453 /** @} **/