messaging: Don't do self-sends in messaging_send_all
[nivanova/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
65
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
69
70 struct messaging_callback {
71         struct messaging_callback *prev, *next;
72         uint32_t msg_type;
73         void (*fn)(struct messaging_context *msg, void *private_data, 
74                    uint32_t msg_type, 
75                    struct server_id server_id, DATA_BLOB *data);
76         void *private_data;
77 };
78
79 struct messaging_registered_ev {
80         struct tevent_context *ev;
81         struct tevent_immediate *im;
82         size_t refcount;
83 };
84
85 struct messaging_context {
86         struct server_id id;
87         struct tevent_context *event_ctx;
88         struct messaging_callback *callbacks;
89
90         struct messaging_rec *posted_msgs;
91
92         struct messaging_registered_ev *event_contexts;
93
94         struct tevent_req **new_waiters;
95         size_t num_new_waiters;
96
97         struct tevent_req **waiters;
98         size_t num_waiters;
99
100         void *msg_dgm_ref;
101         void *msg_ctdb_ref;
102
103         struct server_id_db *names_db;
104 };
105
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107                                                struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109                                        struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111                                        struct tevent_context *ev,
112                                        struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114                                    struct tevent_context *ev,
115                                    struct messaging_rec *rec);
116
117 /****************************************************************************
118  A useful function for testing the message system.
119 ****************************************************************************/
120
121 static void ping_message(struct messaging_context *msg_ctx,
122                          void *private_data,
123                          uint32_t msg_type,
124                          struct server_id src,
125                          DATA_BLOB *data)
126 {
127         struct server_id_buf idbuf;
128
129         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130                   server_id_str_buf(src, &idbuf), (int)data->length,
131                   data->data ? (char *)data->data : ""));
132
133         messaging_send(msg_ctx, src, MSG_PONG, data);
134 }
135
136 struct messaging_rec *messaging_rec_create(
137         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138         uint32_t msg_type, const struct iovec *iov, int iovlen,
139         const int *fds, size_t num_fds)
140 {
141         ssize_t buflen;
142         uint8_t *buf;
143         struct messaging_rec *result;
144
145         if (num_fds > INT8_MAX) {
146                 return NULL;
147         }
148
149         buflen = iov_buflen(iov, iovlen);
150         if (buflen == -1) {
151                 return NULL;
152         }
153         buf = talloc_array(mem_ctx, uint8_t, buflen);
154         if (buf == NULL) {
155                 return NULL;
156         }
157         iov_buf(iov, iovlen, buf, buflen);
158
159         {
160                 struct messaging_rec rec;
161                 int64_t fds64[num_fds];
162                 size_t i;
163
164                 for (i=0; i<num_fds; i++) {
165                         fds64[i] = fds[i];
166                 }
167
168                 rec = (struct messaging_rec) {
169                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170                         .src = src, .dest = dst,
171                         .buf.data = buf, .buf.length = buflen,
172                         .num_fds = num_fds, .fds = fds64,
173                 };
174
175                 result = messaging_rec_dup(mem_ctx, &rec);
176         }
177
178         TALLOC_FREE(buf);
179
180         return result;
181 }
182
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184                                              struct tevent_context *ev)
185 {
186         size_t i, num_event_contexts;
187         struct messaging_registered_ev *free_reg = NULL;
188         struct messaging_registered_ev *tmp;
189
190         num_event_contexts = talloc_array_length(ctx->event_contexts);
191
192         for (i=0; i<num_event_contexts; i++) {
193                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
194
195                 if (reg->ev == ev) {
196                         reg->refcount += 1;
197                         return true;
198                 }
199                 if (reg->refcount == 0) {
200                         if (reg->ev != NULL) {
201                                 abort();
202                         }
203                         free_reg = reg;
204                 }
205         }
206
207         if (free_reg == NULL) {
208                 tmp = talloc_realloc(ctx, ctx->event_contexts,
209                                      struct messaging_registered_ev,
210                                      num_event_contexts+1);
211                 if (tmp == NULL) {
212                         return false;
213                 }
214                 ctx->event_contexts = tmp;
215
216                 free_reg = &ctx->event_contexts[num_event_contexts];
217         }
218
219         *free_reg = (struct messaging_registered_ev) { .ev = ev, .refcount = 1 };
220
221         return true;
222 }
223
224 static bool messaging_deregister_event_context(struct messaging_context *ctx,
225                                                struct tevent_context *ev)
226 {
227         size_t i, num_event_contexts;
228
229         num_event_contexts = talloc_array_length(ctx->event_contexts);
230
231         for (i=0; i<num_event_contexts; i++) {
232                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
233
234                 if (reg->ev == ev) {
235                         if (reg->refcount == 0) {
236                                 return false;
237                         }
238                         reg->refcount -= 1;
239
240                         if (reg->refcount == 0) {
241                                 /*
242                                  * Not strictly necessary, just
243                                  * paranoia
244                                  */
245                                 reg->ev = NULL;
246
247                                 /*
248                                  * Do not talloc_free(reg->im),
249                                  * recycle immediates events.
250                                  */
251                         }
252                         return true;
253                 }
254         }
255         return false;
256 }
257
258 static void messaging_post_main_event_context(struct tevent_context *ev,
259                                               struct tevent_immediate *im,
260                                               void *private_data)
261 {
262         struct messaging_context *ctx = talloc_get_type_abort(
263                 private_data, struct messaging_context);
264
265         while (ctx->posted_msgs != NULL) {
266                 struct messaging_rec *rec = ctx->posted_msgs;
267                 bool consumed;
268
269                 DLIST_REMOVE(ctx->posted_msgs, rec);
270
271                 consumed = messaging_dispatch_classic(ctx, rec);
272                 if (!consumed) {
273                         consumed = messaging_dispatch_waiters(
274                                 ctx, ctx->event_ctx, rec);
275                 }
276
277                 if (!consumed) {
278                         uint8_t i;
279
280                         for (i=0; i<rec->num_fds; i++) {
281                                 close(rec->fds[i]);
282                         }
283                 }
284
285                 TALLOC_FREE(rec);
286         }
287 }
288
289 static void messaging_post_sub_event_context(struct tevent_context *ev,
290                                              struct tevent_immediate *im,
291                                              void *private_data)
292 {
293         struct messaging_context *ctx = talloc_get_type_abort(
294                 private_data, struct messaging_context);
295         struct messaging_rec *rec, *next;
296
297         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
298                 bool consumed;
299
300                 next = rec->next;
301
302                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
303                 if (consumed) {
304                         DLIST_REMOVE(ctx->posted_msgs, rec);
305                         TALLOC_FREE(rec);
306                 }
307         }
308 }
309
310 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
311 {
312         size_t i, num_event_contexts;
313
314         num_event_contexts = talloc_array_length(ctx->event_contexts);
315
316         for (i=0; i<num_event_contexts; i++) {
317                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
318
319                 if (reg->refcount == 0) {
320                         continue;
321                 }
322
323                 if (reg->im == NULL) {
324                         reg->im = tevent_create_immediate(
325                                 ctx->event_contexts);
326                 }
327                 if (reg->im == NULL) {
328                         DBG_WARNING("Could not create immediate\n");
329                         continue;
330                 }
331
332                 /*
333                  * We depend on schedule_immediate to work
334                  * multiple times. Might be a bit inefficient,
335                  * but this needs to be proven in tests. The
336                  * alternatively would be to track whether the
337                  * immediate has already been scheduled. For
338                  * now, avoid that complexity here.
339                  */
340
341                 if (reg->ev == ctx->event_ctx) {
342                         tevent_schedule_immediate(
343                                 reg->im, reg->ev,
344                                 messaging_post_main_event_context,
345                                 ctx);
346                 } else {
347                         tevent_schedule_immediate(
348                                 reg->im, reg->ev,
349                                 messaging_post_sub_event_context,
350                                 ctx);
351                 }
352
353         }
354         return true;
355 }
356
357 static void messaging_recv_cb(struct tevent_context *ev,
358                               const uint8_t *msg, size_t msg_len,
359                               int *fds, size_t num_fds,
360                               void *private_data)
361 {
362         struct messaging_context *msg_ctx = talloc_get_type_abort(
363                 private_data, struct messaging_context);
364         struct server_id_buf idbuf;
365         struct messaging_rec rec;
366         int64_t fds64[MIN(num_fds, INT8_MAX)];
367         size_t i;
368
369         if (msg_len < MESSAGE_HDR_LENGTH) {
370                 DBG_WARNING("message too short: %zu\n", msg_len);
371                 goto close_fail;
372         }
373
374         if (num_fds > INT8_MAX) {
375                 DBG_WARNING("too many fds: %zu\n", num_fds);
376                 goto close_fail;
377         }
378
379         /*
380          * "consume" the fds by copying them and setting
381          * the original variable to -1
382          */
383         for (i=0; i < num_fds; i++) {
384                 fds64[i] = fds[i];
385                 fds[i] = -1;
386         }
387
388         rec = (struct messaging_rec) {
389                 .msg_version = MESSAGE_VERSION,
390                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
391                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
392                 .num_fds = num_fds,
393                 .fds = fds64,
394         };
395
396         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
397
398         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
399                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
400                   server_id_str_buf(rec.src, &idbuf));
401
402         messaging_dispatch_rec(msg_ctx, ev, &rec);
403         return;
404
405 close_fail:
406         for (i=0; i < num_fds; i++) {
407                 close(fds[i]);
408         }
409 }
410
411 static int messaging_context_destructor(struct messaging_context *ctx)
412 {
413         size_t i;
414
415         for (i=0; i<ctx->num_new_waiters; i++) {
416                 if (ctx->new_waiters[i] != NULL) {
417                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
418                         ctx->new_waiters[i] = NULL;
419                 }
420         }
421         for (i=0; i<ctx->num_waiters; i++) {
422                 if (ctx->waiters[i] != NULL) {
423                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
424                         ctx->waiters[i] = NULL;
425                 }
426         }
427
428         /*
429          * The immediates from messaging_alert_event_contexts
430          * reference "ctx". Don't let them outlive the
431          * messaging_context we're destroying here.
432          */
433         TALLOC_FREE(ctx->event_contexts);
434
435         return 0;
436 }
437
438 static const char *private_path(const char *name)
439 {
440         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
441 }
442
443 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
444                                         struct tevent_context *ev,
445                                         struct messaging_context **pmsg_ctx)
446 {
447         TALLOC_CTX *frame;
448         struct messaging_context *ctx;
449         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
450         int ret;
451         const char *lck_path;
452         const char *priv_path;
453         bool ok;
454
455         lck_path = lock_path("msg.lock");
456         if (lck_path == NULL) {
457                 return NT_STATUS_NO_MEMORY;
458         }
459
460         ok = directory_create_or_exist_strict(lck_path,
461                                               sec_initial_uid(),
462                                               0755);
463         if (!ok) {
464                 DBG_DEBUG("Could not create lock directory: %s\n",
465                           strerror(errno));
466                 return NT_STATUS_ACCESS_DENIED;
467         }
468
469         priv_path = private_path("msg.sock");
470         if (priv_path == NULL) {
471                 return NT_STATUS_NO_MEMORY;
472         }
473
474         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
475                                               0700);
476         if (!ok) {
477                 DBG_DEBUG("Could not create msg directory: %s\n",
478                           strerror(errno));
479                 return NT_STATUS_ACCESS_DENIED;
480         }
481
482         frame = talloc_stackframe();
483         if (frame == NULL) {
484                 return NT_STATUS_NO_MEMORY;
485         }
486
487         ctx = talloc_zero(frame, struct messaging_context);
488         if (ctx == NULL) {
489                 status = NT_STATUS_NO_MEMORY;
490                 goto done;
491         }
492
493         ctx->id = (struct server_id) {
494                 .pid = getpid(), .vnn = NONCLUSTER_VNN
495         };
496
497         ctx->event_ctx = ev;
498
499         ok = messaging_register_event_context(ctx, ev);
500         if (!ok) {
501                 status = NT_STATUS_NO_MEMORY;
502                 goto done;
503         }
504
505         sec_init();
506
507         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
508                                              ctx->event_ctx,
509                                              &ctx->id.unique_id,
510                                              priv_path,
511                                              lck_path,
512                                              messaging_recv_cb,
513                                              ctx,
514                                              &ret);
515         if (ctx->msg_dgm_ref == NULL) {
516                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
517                 status = map_nt_error_from_unix(ret);
518                 goto done;
519         }
520         talloc_set_destructor(ctx, messaging_context_destructor);
521
522 #ifdef CLUSTER_SUPPORT
523         if (lp_clustering()) {
524                 ctx->msg_ctdb_ref = messaging_ctdb_ref(
525                         ctx, ctx->event_ctx,
526                         lp_ctdbd_socket(), lp_ctdb_timeout(),
527                         ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
528                 if (ctx->msg_ctdb_ref == NULL) {
529                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
530                                    strerror(ret));
531                         status = map_nt_error_from_unix(ret);
532                         goto done;
533                 }
534         }
535 #endif
536
537         ctx->id.vnn = get_my_vnn();
538
539         ctx->names_db = server_id_db_init(ctx,
540                                           ctx->id,
541                                           lp_lock_directory(),
542                                           0,
543                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
544         if (ctx->names_db == NULL) {
545                 DBG_DEBUG("server_id_db_init failed\n");
546                 status = NT_STATUS_NO_MEMORY;
547                 goto done;
548         }
549
550         messaging_register(ctx, NULL, MSG_PING, ping_message);
551
552         /* Register some debugging related messages */
553
554         register_msg_pool_usage(ctx);
555         register_dmalloc_msgs(ctx);
556         debug_register_msgs(ctx);
557
558         {
559                 struct server_id_buf tmp;
560                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
561         }
562
563         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
564
565         status = NT_STATUS_OK;
566 done:
567         TALLOC_FREE(frame);
568
569         return status;
570 }
571
572 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
573                                          struct tevent_context *ev)
574 {
575         struct messaging_context *ctx = NULL;
576         NTSTATUS status;
577
578         status = messaging_init_internal(mem_ctx,
579                                          ev,
580                                          &ctx);
581         if (!NT_STATUS_IS_OK(status)) {
582                 return NULL;
583         }
584
585         return ctx;
586 }
587
588 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
589                                struct tevent_context *ev,
590                                struct messaging_context **pmsg_ctx)
591 {
592         return messaging_init_internal(mem_ctx,
593                                         ev,
594                                         pmsg_ctx);
595 }
596
597 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
598 {
599         return msg_ctx->id;
600 }
601
602 /*
603  * re-init after a fork
604  */
605 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
606 {
607         int ret;
608         char *lck_path;
609
610         TALLOC_FREE(msg_ctx->msg_dgm_ref);
611         TALLOC_FREE(msg_ctx->msg_ctdb_ref);
612
613         msg_ctx->id = (struct server_id) {
614                 .pid = getpid(), .vnn = msg_ctx->id.vnn
615         };
616
617         lck_path = lock_path("msg.lock");
618         if (lck_path == NULL) {
619                 return NT_STATUS_NO_MEMORY;
620         }
621
622         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
623                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
624                 private_path("msg.sock"), lck_path,
625                 messaging_recv_cb, msg_ctx, &ret);
626
627         if (msg_ctx->msg_dgm_ref == NULL) {
628                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
629                 return map_nt_error_from_unix(ret);
630         }
631
632         if (lp_clustering()) {
633                 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
634                         msg_ctx, msg_ctx->event_ctx,
635                         lp_ctdbd_socket(), lp_ctdb_timeout(),
636                         msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
637                         &ret);
638                 if (msg_ctx->msg_ctdb_ref == NULL) {
639                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
640                                    strerror(ret));
641                         return map_nt_error_from_unix(ret);
642                 }
643         }
644
645         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
646
647         return NT_STATUS_OK;
648 }
649
650
651 /*
652  * Register a dispatch function for a particular message type. Allow multiple
653  * registrants
654 */
655 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
656                             void *private_data,
657                             uint32_t msg_type,
658                             void (*fn)(struct messaging_context *msg,
659                                        void *private_data, 
660                                        uint32_t msg_type, 
661                                        struct server_id server_id,
662                                        DATA_BLOB *data))
663 {
664         struct messaging_callback *cb;
665
666         DEBUG(5, ("Registering messaging pointer for type %u - "
667                   "private_data=%p\n",
668                   (unsigned)msg_type, private_data));
669
670         /*
671          * Only one callback per type
672          */
673
674         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
675                 /* we allow a second registration of the same message
676                    type if it has a different private pointer. This is
677                    needed in, for example, the internal notify code,
678                    which creates a new notify context for each tree
679                    connect, and expects to receive messages to each of
680                    them. */
681                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
682                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
683                                   (unsigned)msg_type, private_data));
684                         cb->fn = fn;
685                         cb->private_data = private_data;
686                         return NT_STATUS_OK;
687                 }
688         }
689
690         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
691                 return NT_STATUS_NO_MEMORY;
692         }
693
694         cb->msg_type = msg_type;
695         cb->fn = fn;
696         cb->private_data = private_data;
697
698         DLIST_ADD(msg_ctx->callbacks, cb);
699         return NT_STATUS_OK;
700 }
701
702 /*
703   De-register the function for a particular message type.
704 */
705 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
706                           void *private_data)
707 {
708         struct messaging_callback *cb, *next;
709
710         for (cb = ctx->callbacks; cb; cb = next) {
711                 next = cb->next;
712                 if ((cb->msg_type == msg_type)
713                     && (cb->private_data == private_data)) {
714                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
715                                   (unsigned)msg_type, private_data));
716                         DLIST_REMOVE(ctx->callbacks, cb);
717                         TALLOC_FREE(cb);
718                 }
719         }
720 }
721
722 /*
723   Send a message to a particular server
724 */
725 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
726                         struct server_id server, uint32_t msg_type,
727                         const DATA_BLOB *data)
728 {
729         struct iovec iov = {0};
730
731         if (data != NULL) {
732                 iov.iov_base = data->data;
733                 iov.iov_len = data->length;
734         };
735
736         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
737 }
738
739 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
740                             struct server_id server, uint32_t msg_type,
741                             const uint8_t *buf, size_t len)
742 {
743         DATA_BLOB blob = data_blob_const(buf, len);
744         return messaging_send(msg_ctx, server, msg_type, &blob);
745 }
746
747 static int messaging_post_self(struct messaging_context *msg_ctx,
748                                struct server_id src, struct server_id dst,
749                                uint32_t msg_type,
750                                const struct iovec *iov, int iovlen,
751                                const int *fds, size_t num_fds)
752 {
753         struct messaging_rec *rec;
754         bool ok;
755
756         rec = messaging_rec_create(
757                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
758         if (rec == NULL) {
759                 return ENOMEM;
760         }
761
762         ok = messaging_alert_event_contexts(msg_ctx);
763         if (!ok) {
764                 TALLOC_FREE(rec);
765                 return ENOMEM;
766         }
767
768         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
769
770         return 0;
771 }
772
773 int messaging_send_iov_from(struct messaging_context *msg_ctx,
774                             struct server_id src, struct server_id dst,
775                             uint32_t msg_type,
776                             const struct iovec *iov, int iovlen,
777                             const int *fds, size_t num_fds)
778 {
779         int ret;
780         uint8_t hdr[MESSAGE_HDR_LENGTH];
781         struct iovec iov2[iovlen+1];
782
783         if (server_id_is_disconnected(&dst)) {
784                 return EINVAL;
785         }
786
787         if (num_fds > INT8_MAX) {
788                 return EINVAL;
789         }
790
791         if (server_id_equal(&dst, &msg_ctx->id)) {
792                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
793                                           iov, iovlen, fds, num_fds);
794                 return ret;
795         }
796
797         message_hdr_put(hdr, msg_type, src, dst);
798         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
799         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
800
801         if (dst.vnn != msg_ctx->id.vnn) {
802                 if (num_fds > 0) {
803                         return ENOSYS;
804                 }
805
806                 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
807                 return ret;
808         }
809
810         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
811
812         if (ret == EACCES) {
813                 become_root();
814                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
815                                          fds, num_fds);
816                 unbecome_root();
817         }
818
819         if (ret == ECONNREFUSED) {
820                 /*
821                  * Linux returns this when a socket exists in the file
822                  * system without a listening process. This is not
823                  * documented in susv4 or the linux manpages, but it's
824                  * easily testable. For the higher levels this is the
825                  * same as "destination does not exist"
826                  */
827                 ret = ENOENT;
828         }
829
830         return ret;
831 }
832
833 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
834                             struct server_id dst, uint32_t msg_type,
835                             const struct iovec *iov, int iovlen,
836                             const int *fds, size_t num_fds)
837 {
838         int ret;
839
840         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
841                                       iov, iovlen, fds, num_fds);
842         if (ret != 0) {
843                 return map_nt_error_from_unix(ret);
844         }
845         return NT_STATUS_OK;
846 }
847
848 struct send_all_state {
849         struct messaging_context *msg_ctx;
850         int msg_type;
851         const void *buf;
852         size_t len;
853 };
854
855 static int send_all_fn(pid_t pid, void *private_data)
856 {
857         struct send_all_state *state = private_data;
858         NTSTATUS status;
859
860         if (pid == getpid()) {
861                 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
862                 return 0;
863         }
864
865         status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
866                                     state->msg_type, state->buf, state->len);
867         if (!NT_STATUS_IS_OK(status)) {
868                 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
869                             (uintmax_t)pid, nt_errstr(status));
870         }
871
872         return 0;
873 }
874
875 void messaging_send_all(struct messaging_context *msg_ctx,
876                         int msg_type, const void *buf, size_t len)
877 {
878         struct send_all_state state = {
879                 .msg_ctx = msg_ctx, .msg_type = msg_type,
880                 .buf = buf, .len = len
881         };
882         int ret;
883
884 #ifdef CLUSTER_SUPPORT
885         if (lp_clustering()) {
886                 struct ctdbd_connection *conn = messaging_ctdb_connection();
887                 uint8_t msghdr[MESSAGE_HDR_LENGTH];
888                 struct iovec iov[] = {
889                         { .iov_base = msghdr,
890                           .iov_len = sizeof(msghdr) },
891                         { .iov_base = discard_const_p(void, buf),
892                           .iov_len = len }
893                 };
894
895                 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
896                                 (struct server_id) {0});
897
898                 ret = ctdbd_messaging_send_iov(
899                         conn, CTDB_BROADCAST_CONNECTED,
900                         CTDB_SRVID_SAMBA_PROCESS,
901                         iov, ARRAY_SIZE(iov));
902                 if (ret != 0) {
903                         DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
904                                     strerror(ret));
905                 }
906
907                 return;
908         }
909 #endif
910
911         ret = messaging_dgm_forall(send_all_fn, &state);
912         if (ret != 0) {
913                 DBG_WARNING("messaging_dgm_forall failed: %s\n",
914                             strerror(ret));
915         }
916 }
917
918 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
919                                                struct messaging_rec *rec)
920 {
921         struct messaging_rec *result;
922         size_t fds_size = sizeof(int64_t) * rec->num_fds;
923         size_t payload_len;
924
925         payload_len = rec->buf.length + fds_size;
926         if (payload_len < rec->buf.length) {
927                 /* overflow */
928                 return NULL;
929         }
930
931         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
932                                       payload_len);
933         if (result == NULL) {
934                 return NULL;
935         }
936         *result = *rec;
937
938         /* Doesn't fail, see talloc_pooled_object */
939
940         result->buf.data = talloc_memdup(result, rec->buf.data,
941                                          rec->buf.length);
942
943         result->fds = NULL;
944         if (result->num_fds > 0) {
945                 result->fds = talloc_memdup(result, rec->fds, fds_size);
946         }
947
948         return result;
949 }
950
951 struct messaging_filtered_read_state {
952         struct tevent_context *ev;
953         struct messaging_context *msg_ctx;
954         struct messaging_dgm_fde *fde;
955         struct messaging_ctdb_fde *cluster_fde;
956
957         bool (*filter)(struct messaging_rec *rec, void *private_data);
958         void *private_data;
959
960         struct messaging_rec *rec;
961 };
962
963 static void messaging_filtered_read_cleanup(struct tevent_req *req,
964                                             enum tevent_req_state req_state);
965
966 struct tevent_req *messaging_filtered_read_send(
967         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
968         struct messaging_context *msg_ctx,
969         bool (*filter)(struct messaging_rec *rec, void *private_data),
970         void *private_data)
971 {
972         struct tevent_req *req;
973         struct messaging_filtered_read_state *state;
974         size_t new_waiters_len;
975         bool ok;
976
977         req = tevent_req_create(mem_ctx, &state,
978                                 struct messaging_filtered_read_state);
979         if (req == NULL) {
980                 return NULL;
981         }
982         state->ev = ev;
983         state->msg_ctx = msg_ctx;
984         state->filter = filter;
985         state->private_data = private_data;
986
987         /*
988          * We have to defer the callback here, as we might be called from
989          * within a different tevent_context than state->ev
990          */
991         tevent_req_defer_callback(req, state->ev);
992
993         state->fde = messaging_dgm_register_tevent_context(state, ev);
994         if (tevent_req_nomem(state->fde, req)) {
995                 return tevent_req_post(req, ev);
996         }
997
998         if (lp_clustering()) {
999                 state->cluster_fde =
1000                         messaging_ctdb_register_tevent_context(state, ev);
1001                 if (tevent_req_nomem(state->cluster_fde, req)) {
1002                         return tevent_req_post(req, ev);
1003                 }
1004         }
1005
1006         /*
1007          * We add ourselves to the "new_waiters" array, not the "waiters"
1008          * array. If we are called from within messaging_read_done,
1009          * messaging_dispatch_rec will be in an active for-loop on
1010          * "waiters". We must be careful not to mess with this array, because
1011          * it could mean that a single event is being delivered twice.
1012          */
1013
1014         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1015
1016         if (new_waiters_len == msg_ctx->num_new_waiters) {
1017                 struct tevent_req **tmp;
1018
1019                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1020                                      struct tevent_req *, new_waiters_len+1);
1021                 if (tevent_req_nomem(tmp, req)) {
1022                         return tevent_req_post(req, ev);
1023                 }
1024                 msg_ctx->new_waiters = tmp;
1025         }
1026
1027         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1028         msg_ctx->num_new_waiters += 1;
1029         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1030
1031         ok = messaging_register_event_context(msg_ctx, ev);
1032         if (!ok) {
1033                 tevent_req_oom(req);
1034                 return tevent_req_post(req, ev);
1035         }
1036
1037         return req;
1038 }
1039
1040 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1041                                             enum tevent_req_state req_state)
1042 {
1043         struct messaging_filtered_read_state *state = tevent_req_data(
1044                 req, struct messaging_filtered_read_state);
1045         struct messaging_context *msg_ctx = state->msg_ctx;
1046         size_t i;
1047         bool ok;
1048
1049         tevent_req_set_cleanup_fn(req, NULL);
1050
1051         TALLOC_FREE(state->fde);
1052         TALLOC_FREE(state->cluster_fde);
1053
1054         ok = messaging_deregister_event_context(msg_ctx, state->ev);
1055         if (!ok) {
1056                 abort();
1057         }
1058
1059         /*
1060          * Just set the [new_]waiters entry to NULL, be careful not to mess
1061          * with the other "waiters" array contents. We are often called from
1062          * within "messaging_dispatch_rec", which loops over
1063          * "waiters". Messing with the "waiters" array will mess up that
1064          * for-loop.
1065          */
1066
1067         for (i=0; i<msg_ctx->num_waiters; i++) {
1068                 if (msg_ctx->waiters[i] == req) {
1069                         msg_ctx->waiters[i] = NULL;
1070                         return;
1071                 }
1072         }
1073
1074         for (i=0; i<msg_ctx->num_new_waiters; i++) {
1075                 if (msg_ctx->new_waiters[i] == req) {
1076                         msg_ctx->new_waiters[i] = NULL;
1077                         return;
1078                 }
1079         }
1080 }
1081
1082 static void messaging_filtered_read_done(struct tevent_req *req,
1083                                          struct messaging_rec *rec)
1084 {
1085         struct messaging_filtered_read_state *state = tevent_req_data(
1086                 req, struct messaging_filtered_read_state);
1087
1088         state->rec = messaging_rec_dup(state, rec);
1089         if (tevent_req_nomem(state->rec, req)) {
1090                 return;
1091         }
1092         tevent_req_done(req);
1093 }
1094
1095 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1096                                  struct messaging_rec **presult)
1097 {
1098         struct messaging_filtered_read_state *state = tevent_req_data(
1099                 req, struct messaging_filtered_read_state);
1100         int err;
1101
1102         if (tevent_req_is_unix_error(req, &err)) {
1103                 tevent_req_received(req);
1104                 return err;
1105         }
1106         if (presult != NULL) {
1107                 *presult = talloc_move(mem_ctx, &state->rec);
1108         }
1109         return 0;
1110 }
1111
1112 struct messaging_read_state {
1113         uint32_t msg_type;
1114         struct messaging_rec *rec;
1115 };
1116
1117 static bool messaging_read_filter(struct messaging_rec *rec,
1118                                   void *private_data);
1119 static void messaging_read_done(struct tevent_req *subreq);
1120
1121 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1122                                        struct tevent_context *ev,
1123                                        struct messaging_context *msg,
1124                                        uint32_t msg_type)
1125 {
1126         struct tevent_req *req, *subreq;
1127         struct messaging_read_state *state;
1128
1129         req = tevent_req_create(mem_ctx, &state,
1130                                 struct messaging_read_state);
1131         if (req == NULL) {
1132                 return NULL;
1133         }
1134         state->msg_type = msg_type;
1135
1136         subreq = messaging_filtered_read_send(state, ev, msg,
1137                                               messaging_read_filter, state);
1138         if (tevent_req_nomem(subreq, req)) {
1139                 return tevent_req_post(req, ev);
1140         }
1141         tevent_req_set_callback(subreq, messaging_read_done, req);
1142         return req;
1143 }
1144
1145 static bool messaging_read_filter(struct messaging_rec *rec,
1146                                   void *private_data)
1147 {
1148         struct messaging_read_state *state = talloc_get_type_abort(
1149                 private_data, struct messaging_read_state);
1150
1151         if (rec->num_fds != 0) {
1152                 return false;
1153         }
1154
1155         return rec->msg_type == state->msg_type;
1156 }
1157
1158 static void messaging_read_done(struct tevent_req *subreq)
1159 {
1160         struct tevent_req *req = tevent_req_callback_data(
1161                 subreq, struct tevent_req);
1162         struct messaging_read_state *state = tevent_req_data(
1163                 req, struct messaging_read_state);
1164         int ret;
1165
1166         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1167         TALLOC_FREE(subreq);
1168         if (tevent_req_error(req, ret)) {
1169                 return;
1170         }
1171         tevent_req_done(req);
1172 }
1173
1174 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1175                         struct messaging_rec **presult)
1176 {
1177         struct messaging_read_state *state = tevent_req_data(
1178                 req, struct messaging_read_state);
1179         int err;
1180
1181         if (tevent_req_is_unix_error(req, &err)) {
1182                 return err;
1183         }
1184         if (presult != NULL) {
1185                 *presult = talloc_move(mem_ctx, &state->rec);
1186         }
1187         return 0;
1188 }
1189
1190 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1191 {
1192         if (msg_ctx->num_new_waiters == 0) {
1193                 return true;
1194         }
1195
1196         if (talloc_array_length(msg_ctx->waiters) <
1197             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1198                 struct tevent_req **tmp;
1199                 tmp = talloc_realloc(
1200                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1201                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1202                 if (tmp == NULL) {
1203                         DEBUG(1, ("%s: talloc failed\n", __func__));
1204                         return false;
1205                 }
1206                 msg_ctx->waiters = tmp;
1207         }
1208
1209         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1210                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1211
1212         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1213         msg_ctx->num_new_waiters = 0;
1214
1215         return true;
1216 }
1217
1218 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1219                                        struct messaging_rec *rec)
1220 {
1221         struct messaging_callback *cb, *next;
1222
1223         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1224                 size_t j;
1225
1226                 next = cb->next;
1227                 if (cb->msg_type != rec->msg_type) {
1228                         continue;
1229                 }
1230
1231                 /*
1232                  * the old style callbacks don't support fd passing
1233                  */
1234                 for (j=0; j < rec->num_fds; j++) {
1235                         int fd = rec->fds[j];
1236                         close(fd);
1237                 }
1238                 rec->num_fds = 0;
1239                 rec->fds = NULL;
1240
1241                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1242                        rec->src, &rec->buf);
1243
1244                 return true;
1245         }
1246
1247         return false;
1248 }
1249
1250 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1251                                        struct tevent_context *ev,
1252                                        struct messaging_rec *rec)
1253 {
1254         size_t i;
1255
1256         if (!messaging_append_new_waiters(msg_ctx)) {
1257                 return false;
1258         }
1259
1260         i = 0;
1261         while (i < msg_ctx->num_waiters) {
1262                 struct tevent_req *req;
1263                 struct messaging_filtered_read_state *state;
1264
1265                 req = msg_ctx->waiters[i];
1266                 if (req == NULL) {
1267                         /*
1268                          * This got cleaned up. In the meantime,
1269                          * move everything down one. We need
1270                          * to keep the order of waiters, as
1271                          * other code may depend on this.
1272                          */
1273                         if (i < msg_ctx->num_waiters - 1) {
1274                                 memmove(&msg_ctx->waiters[i],
1275                                         &msg_ctx->waiters[i+1],
1276                                         sizeof(struct tevent_req *) *
1277                                             (msg_ctx->num_waiters - i - 1));
1278                         }
1279                         msg_ctx->num_waiters -= 1;
1280                         continue;
1281                 }
1282
1283                 state = tevent_req_data(
1284                         req, struct messaging_filtered_read_state);
1285                 if ((ev == state->ev) &&
1286                     state->filter(rec, state->private_data)) {
1287                         messaging_filtered_read_done(req, rec);
1288                         return true;
1289                 }
1290
1291                 i += 1;
1292         }
1293
1294         return false;
1295 }
1296
1297 /*
1298   Dispatch one messaging_rec
1299 */
1300 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1301                                    struct tevent_context *ev,
1302                                    struct messaging_rec *rec)
1303 {
1304         bool consumed;
1305         size_t i;
1306
1307         if (ev == msg_ctx->event_ctx) {
1308                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1309                 if (consumed) {
1310                         return;
1311                 }
1312         }
1313
1314         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1315         if (consumed) {
1316                 return;
1317         }
1318
1319         if (ev != msg_ctx->event_ctx) {
1320                 struct iovec iov;
1321                 int fds[rec->num_fds];
1322                 int ret;
1323
1324                 /*
1325                  * We've been listening on a nested event
1326                  * context. Messages need to be handled in the main
1327                  * event context, so post to ourselves
1328                  */
1329
1330                 iov.iov_base = rec->buf.data;
1331                 iov.iov_len = rec->buf.length;
1332
1333                 for (i=0; i<rec->num_fds; i++) {
1334                         fds[i] = rec->fds[i];
1335                 }
1336
1337                 ret = messaging_post_self(
1338                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1339                         &iov, 1, fds, rec->num_fds);
1340                 if (ret == 0) {
1341                         return;
1342                 }
1343         }
1344
1345         /*
1346          * If the fd-array isn't used, just close it.
1347          */
1348         for (i=0; i < rec->num_fds; i++) {
1349                 int fd = rec->fds[i];
1350                 close(fd);
1351         }
1352         rec->num_fds = 0;
1353         rec->fds = NULL;
1354 }
1355
1356 static int mess_parent_dgm_cleanup(void *private_data);
1357 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1358
1359 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1360 {
1361         struct tevent_req *req;
1362
1363         req = background_job_send(
1364                 msg, msg->event_ctx, msg, NULL, 0,
1365                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1366                             60*15),
1367                 mess_parent_dgm_cleanup, msg);
1368         if (req == NULL) {
1369                 DBG_WARNING("background_job_send failed\n");
1370                 return false;
1371         }
1372         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1373         return true;
1374 }
1375
1376 static int mess_parent_dgm_cleanup(void *private_data)
1377 {
1378         int ret;
1379
1380         ret = messaging_dgm_wipe();
1381         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1382                    ret ? strerror(ret) : "ok"));
1383         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1384                            60*15);
1385 }
1386
1387 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1388 {
1389         struct messaging_context *msg = tevent_req_callback_data(
1390                 req, struct messaging_context);
1391         NTSTATUS status;
1392
1393         status = background_job_recv(req);
1394         TALLOC_FREE(req);
1395         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1396                   nt_errstr(status)));
1397
1398         req = background_job_send(
1399                 msg, msg->event_ctx, msg, NULL, 0,
1400                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1401                             60*15),
1402                 mess_parent_dgm_cleanup, msg);
1403         if (req == NULL) {
1404                 DEBUG(1, ("background_job_send failed\n"));
1405                 return;
1406         }
1407         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1408 }
1409
1410 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1411 {
1412         int ret;
1413
1414         if (pid == 0) {
1415                 ret = messaging_dgm_wipe();
1416         } else {
1417                 ret = messaging_dgm_cleanup(pid);
1418         }
1419
1420         return ret;
1421 }
1422
1423 struct tevent_context *messaging_tevent_context(
1424         struct messaging_context *msg_ctx)
1425 {
1426         return msg_ctx->event_ctx;
1427 }
1428
1429 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1430 {
1431         return msg_ctx->names_db;
1432 }
1433
1434 /** @} **/