libsmb: Pull up wire_flags calculation from open_internal
[garming/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
65
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
69
70 struct messaging_callback {
71         struct messaging_callback *prev, *next;
72         uint32_t msg_type;
73         void (*fn)(struct messaging_context *msg, void *private_data, 
74                    uint32_t msg_type, 
75                    struct server_id server_id, DATA_BLOB *data);
76         void *private_data;
77 };
78
79 struct messaging_registered_ev {
80         struct tevent_context *ev;
81         struct tevent_immediate *im;
82         size_t refcount;
83 };
84
85 struct messaging_context {
86         struct server_id id;
87         struct tevent_context *event_ctx;
88         struct messaging_callback *callbacks;
89
90         struct messaging_rec *posted_msgs;
91
92         struct messaging_registered_ev *event_contexts;
93
94         struct tevent_req **new_waiters;
95         size_t num_new_waiters;
96
97         struct tevent_req **waiters;
98         size_t num_waiters;
99
100         void *msg_dgm_ref;
101         void *msg_ctdb_ref;
102
103         struct server_id_db *names_db;
104 };
105
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107                                                struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109                                        struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111                                        struct tevent_context *ev,
112                                        struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114                                    struct tevent_context *ev,
115                                    struct messaging_rec *rec);
116
117 /****************************************************************************
118  A useful function for testing the message system.
119 ****************************************************************************/
120
121 static void ping_message(struct messaging_context *msg_ctx,
122                          void *private_data,
123                          uint32_t msg_type,
124                          struct server_id src,
125                          DATA_BLOB *data)
126 {
127         struct server_id_buf idbuf;
128
129         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130                   server_id_str_buf(src, &idbuf), (int)data->length,
131                   data->data ? (char *)data->data : ""));
132
133         messaging_send(msg_ctx, src, MSG_PONG, data);
134 }
135
136 struct messaging_rec *messaging_rec_create(
137         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138         uint32_t msg_type, const struct iovec *iov, int iovlen,
139         const int *fds, size_t num_fds)
140 {
141         ssize_t buflen;
142         uint8_t *buf;
143         struct messaging_rec *result;
144
145         if (num_fds > INT8_MAX) {
146                 return NULL;
147         }
148
149         buflen = iov_buflen(iov, iovlen);
150         if (buflen == -1) {
151                 return NULL;
152         }
153         buf = talloc_array(mem_ctx, uint8_t, buflen);
154         if (buf == NULL) {
155                 return NULL;
156         }
157         iov_buf(iov, iovlen, buf, buflen);
158
159         {
160                 struct messaging_rec rec;
161                 int64_t fds64[num_fds];
162                 size_t i;
163
164                 for (i=0; i<num_fds; i++) {
165                         fds64[i] = fds[i];
166                 }
167
168                 rec = (struct messaging_rec) {
169                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170                         .src = src, .dest = dst,
171                         .buf.data = buf, .buf.length = buflen,
172                         .num_fds = num_fds, .fds = fds64,
173                 };
174
175                 result = messaging_rec_dup(mem_ctx, &rec);
176         }
177
178         TALLOC_FREE(buf);
179
180         return result;
181 }
182
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184                                              struct tevent_context *ev)
185 {
186         size_t i, num_event_contexts;
187         struct messaging_registered_ev *free_reg = NULL;
188         struct messaging_registered_ev *tmp;
189
190         num_event_contexts = talloc_array_length(ctx->event_contexts);
191
192         for (i=0; i<num_event_contexts; i++) {
193                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
194
195                 if (reg->refcount == 0) {
196                         if (reg->ev != NULL) {
197                                 abort();
198                         }
199                         free_reg = reg;
200                         /*
201                          * We continue here and may find another
202                          * free_req, but the important thing is
203                          * that we continue to search for an
204                          * existing registration in the loop.
205                          */
206                         continue;
207                 }
208
209                 if (reg->ev == ev) {
210                         reg->refcount += 1;
211                         return true;
212                 }
213         }
214
215         if (free_reg == NULL) {
216                 struct tevent_immediate *im = NULL;
217
218                 im = tevent_create_immediate(ctx);
219                 if (im == NULL) {
220                         return false;
221                 }
222
223                 tmp = talloc_realloc(ctx, ctx->event_contexts,
224                                      struct messaging_registered_ev,
225                                      num_event_contexts+1);
226                 if (tmp == NULL) {
227                         return false;
228                 }
229                 ctx->event_contexts = tmp;
230
231                 free_reg = &ctx->event_contexts[num_event_contexts];
232                 free_reg->im = talloc_move(ctx->event_contexts, &im);
233         }
234
235         /*
236          * free_reg->im might be cached
237          */
238         free_reg->ev = ev;
239         free_reg->refcount = 1;
240
241         return true;
242 }
243
244 static bool messaging_deregister_event_context(struct messaging_context *ctx,
245                                                struct tevent_context *ev)
246 {
247         size_t i, num_event_contexts;
248
249         num_event_contexts = talloc_array_length(ctx->event_contexts);
250
251         for (i=0; i<num_event_contexts; i++) {
252                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
253
254                 if (reg->refcount == 0) {
255                         continue;
256                 }
257
258                 if (reg->ev == ev) {
259                         reg->refcount -= 1;
260
261                         if (reg->refcount == 0) {
262                                 /*
263                                  * The primary event context
264                                  * is never unregistered using
265                                  * messaging_deregister_event_context()
266                                  * it's only registered using
267                                  * messaging_register_event_context().
268                                  */
269                                 SMB_ASSERT(ev != ctx->event_ctx);
270                                 SMB_ASSERT(reg->ev != ctx->event_ctx);
271
272                                 /*
273                                  * Not strictly necessary, just
274                                  * paranoia
275                                  */
276                                 reg->ev = NULL;
277
278                                 /*
279                                  * Do not talloc_free(reg->im),
280                                  * recycle immediates events.
281                                  *
282                                  * We just invalidate it using
283                                  * the primary event context,
284                                  * which is never unregistered.
285                                  */
286                                 tevent_schedule_immediate(reg->im,
287                                                           ctx->event_ctx,
288                                                           NULL, NULL);
289                         }
290                         return true;
291                 }
292         }
293         return false;
294 }
295
296 static void messaging_post_main_event_context(struct tevent_context *ev,
297                                               struct tevent_immediate *im,
298                                               void *private_data)
299 {
300         struct messaging_context *ctx = talloc_get_type_abort(
301                 private_data, struct messaging_context);
302
303         while (ctx->posted_msgs != NULL) {
304                 struct messaging_rec *rec = ctx->posted_msgs;
305                 bool consumed;
306
307                 DLIST_REMOVE(ctx->posted_msgs, rec);
308
309                 consumed = messaging_dispatch_classic(ctx, rec);
310                 if (!consumed) {
311                         consumed = messaging_dispatch_waiters(
312                                 ctx, ctx->event_ctx, rec);
313                 }
314
315                 if (!consumed) {
316                         uint8_t i;
317
318                         for (i=0; i<rec->num_fds; i++) {
319                                 close(rec->fds[i]);
320                         }
321                 }
322
323                 TALLOC_FREE(rec);
324         }
325 }
326
327 static void messaging_post_sub_event_context(struct tevent_context *ev,
328                                              struct tevent_immediate *im,
329                                              void *private_data)
330 {
331         struct messaging_context *ctx = talloc_get_type_abort(
332                 private_data, struct messaging_context);
333         struct messaging_rec *rec, *next;
334
335         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
336                 bool consumed;
337
338                 next = rec->next;
339
340                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
341                 if (consumed) {
342                         DLIST_REMOVE(ctx->posted_msgs, rec);
343                         TALLOC_FREE(rec);
344                 }
345         }
346 }
347
348 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
349 {
350         size_t i, num_event_contexts;
351
352         num_event_contexts = talloc_array_length(ctx->event_contexts);
353
354         for (i=0; i<num_event_contexts; i++) {
355                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
356
357                 if (reg->refcount == 0) {
358                         continue;
359                 }
360
361                 /*
362                  * We depend on schedule_immediate to work
363                  * multiple times. Might be a bit inefficient,
364                  * but this needs to be proven in tests. The
365                  * alternatively would be to track whether the
366                  * immediate has already been scheduled. For
367                  * now, avoid that complexity here.
368                  */
369
370                 if (reg->ev == ctx->event_ctx) {
371                         tevent_schedule_immediate(
372                                 reg->im, reg->ev,
373                                 messaging_post_main_event_context,
374                                 ctx);
375                 } else {
376                         tevent_schedule_immediate(
377                                 reg->im, reg->ev,
378                                 messaging_post_sub_event_context,
379                                 ctx);
380                 }
381
382         }
383         return true;
384 }
385
386 static void messaging_recv_cb(struct tevent_context *ev,
387                               const uint8_t *msg, size_t msg_len,
388                               int *fds, size_t num_fds,
389                               void *private_data)
390 {
391         struct messaging_context *msg_ctx = talloc_get_type_abort(
392                 private_data, struct messaging_context);
393         struct server_id_buf idbuf;
394         struct messaging_rec rec;
395         int64_t fds64[MIN(num_fds, INT8_MAX)];
396         size_t i;
397
398         if (msg_len < MESSAGE_HDR_LENGTH) {
399                 DBG_WARNING("message too short: %zu\n", msg_len);
400                 goto close_fail;
401         }
402
403         if (num_fds > INT8_MAX) {
404                 DBG_WARNING("too many fds: %zu\n", num_fds);
405                 goto close_fail;
406         }
407
408         /*
409          * "consume" the fds by copying them and setting
410          * the original variable to -1
411          */
412         for (i=0; i < num_fds; i++) {
413                 fds64[i] = fds[i];
414                 fds[i] = -1;
415         }
416
417         rec = (struct messaging_rec) {
418                 .msg_version = MESSAGE_VERSION,
419                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
420                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
421                 .num_fds = num_fds,
422                 .fds = fds64,
423         };
424
425         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
426
427         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
428                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
429                   server_id_str_buf(rec.src, &idbuf));
430
431         if (server_id_same_process(&rec.src, &msg_ctx->id)) {
432                 DBG_DEBUG("Ignoring self-send\n");
433                 goto close_fail;
434         }
435
436         messaging_dispatch_rec(msg_ctx, ev, &rec);
437         return;
438
439 close_fail:
440         for (i=0; i < num_fds; i++) {
441                 close(fds[i]);
442         }
443 }
444
445 static int messaging_context_destructor(struct messaging_context *ctx)
446 {
447         size_t i;
448
449         for (i=0; i<ctx->num_new_waiters; i++) {
450                 if (ctx->new_waiters[i] != NULL) {
451                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
452                         ctx->new_waiters[i] = NULL;
453                 }
454         }
455         for (i=0; i<ctx->num_waiters; i++) {
456                 if (ctx->waiters[i] != NULL) {
457                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
458                         ctx->waiters[i] = NULL;
459                 }
460         }
461
462         /*
463          * The immediates from messaging_alert_event_contexts
464          * reference "ctx". Don't let them outlive the
465          * messaging_context we're destroying here.
466          */
467         TALLOC_FREE(ctx->event_contexts);
468
469         return 0;
470 }
471
472 static const char *private_path(const char *name)
473 {
474         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
475 }
476
477 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
478                                         struct tevent_context *ev,
479                                         struct messaging_context **pmsg_ctx)
480 {
481         TALLOC_CTX *frame;
482         struct messaging_context *ctx;
483         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
484         int ret;
485         const char *lck_path;
486         const char *priv_path;
487         bool ok;
488
489         /*
490          * sec_init() *must* be called before any other
491          * functions that use sec_XXX(). e.g. sec_initial_uid().
492          */
493
494         sec_init();
495
496         lck_path = lock_path(talloc_tos(), "msg.lock");
497         if (lck_path == NULL) {
498                 return NT_STATUS_NO_MEMORY;
499         }
500
501         ok = directory_create_or_exist_strict(lck_path,
502                                               sec_initial_uid(),
503                                               0755);
504         if (!ok) {
505                 DBG_DEBUG("Could not create lock directory: %s\n",
506                           strerror(errno));
507                 return NT_STATUS_ACCESS_DENIED;
508         }
509
510         priv_path = private_path("msg.sock");
511         if (priv_path == NULL) {
512                 return NT_STATUS_NO_MEMORY;
513         }
514
515         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
516                                               0700);
517         if (!ok) {
518                 DBG_DEBUG("Could not create msg directory: %s\n",
519                           strerror(errno));
520                 return NT_STATUS_ACCESS_DENIED;
521         }
522
523         frame = talloc_stackframe();
524         if (frame == NULL) {
525                 return NT_STATUS_NO_MEMORY;
526         }
527
528         ctx = talloc_zero(frame, struct messaging_context);
529         if (ctx == NULL) {
530                 status = NT_STATUS_NO_MEMORY;
531                 goto done;
532         }
533
534         ctx->id = (struct server_id) {
535                 .pid = getpid(), .vnn = NONCLUSTER_VNN
536         };
537
538         ctx->event_ctx = ev;
539
540         ok = messaging_register_event_context(ctx, ev);
541         if (!ok) {
542                 status = NT_STATUS_NO_MEMORY;
543                 goto done;
544         }
545
546         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
547                                              ctx->event_ctx,
548                                              &ctx->id.unique_id,
549                                              priv_path,
550                                              lck_path,
551                                              messaging_recv_cb,
552                                              ctx,
553                                              &ret);
554         if (ctx->msg_dgm_ref == NULL) {
555                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
556                 status = map_nt_error_from_unix(ret);
557                 goto done;
558         }
559         talloc_set_destructor(ctx, messaging_context_destructor);
560
561 #ifdef CLUSTER_SUPPORT
562         if (lp_clustering()) {
563                 ctx->msg_ctdb_ref = messaging_ctdb_ref(
564                         ctx, ctx->event_ctx,
565                         lp_ctdbd_socket(), lp_ctdb_timeout(),
566                         ctx->id.unique_id, messaging_recv_cb, ctx, &ret);
567                 if (ctx->msg_ctdb_ref == NULL) {
568                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
569                                    strerror(ret));
570                         status = map_nt_error_from_unix(ret);
571                         goto done;
572                 }
573         }
574 #endif
575
576         ctx->id.vnn = get_my_vnn();
577
578         ctx->names_db = server_id_db_init(ctx,
579                                           ctx->id,
580                                           lp_lock_directory(),
581                                           0,
582                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
583         if (ctx->names_db == NULL) {
584                 DBG_DEBUG("server_id_db_init failed\n");
585                 status = NT_STATUS_NO_MEMORY;
586                 goto done;
587         }
588
589         messaging_register(ctx, NULL, MSG_PING, ping_message);
590
591         /* Register some debugging related messages */
592
593         register_msg_pool_usage(ctx);
594         register_dmalloc_msgs(ctx);
595         debug_register_msgs(ctx);
596
597         {
598                 struct server_id_buf tmp;
599                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
600         }
601
602         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
603
604         status = NT_STATUS_OK;
605 done:
606         TALLOC_FREE(frame);
607
608         return status;
609 }
610
611 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
612                                          struct tevent_context *ev)
613 {
614         struct messaging_context *ctx = NULL;
615         NTSTATUS status;
616
617         status = messaging_init_internal(mem_ctx,
618                                          ev,
619                                          &ctx);
620         if (!NT_STATUS_IS_OK(status)) {
621                 return NULL;
622         }
623
624         return ctx;
625 }
626
627 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
628 {
629         return msg_ctx->id;
630 }
631
632 /*
633  * re-init after a fork
634  */
635 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
636 {
637         int ret;
638         char *lck_path;
639
640         TALLOC_FREE(msg_ctx->msg_dgm_ref);
641         TALLOC_FREE(msg_ctx->msg_ctdb_ref);
642
643         msg_ctx->id = (struct server_id) {
644                 .pid = getpid(), .vnn = msg_ctx->id.vnn
645         };
646
647         lck_path = lock_path(talloc_tos(), "msg.lock");
648         if (lck_path == NULL) {
649                 return NT_STATUS_NO_MEMORY;
650         }
651
652         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
653                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
654                 private_path("msg.sock"), lck_path,
655                 messaging_recv_cb, msg_ctx, &ret);
656
657         if (msg_ctx->msg_dgm_ref == NULL) {
658                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
659                 return map_nt_error_from_unix(ret);
660         }
661
662         if (lp_clustering()) {
663                 msg_ctx->msg_ctdb_ref = messaging_ctdb_ref(
664                         msg_ctx, msg_ctx->event_ctx,
665                         lp_ctdbd_socket(), lp_ctdb_timeout(),
666                         msg_ctx->id.unique_id, messaging_recv_cb, msg_ctx,
667                         &ret);
668                 if (msg_ctx->msg_ctdb_ref == NULL) {
669                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
670                                    strerror(ret));
671                         return map_nt_error_from_unix(ret);
672                 }
673         }
674
675         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
676
677         return NT_STATUS_OK;
678 }
679
680
681 /*
682  * Register a dispatch function for a particular message type. Allow multiple
683  * registrants
684 */
685 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
686                             void *private_data,
687                             uint32_t msg_type,
688                             void (*fn)(struct messaging_context *msg,
689                                        void *private_data, 
690                                        uint32_t msg_type, 
691                                        struct server_id server_id,
692                                        DATA_BLOB *data))
693 {
694         struct messaging_callback *cb;
695
696         DEBUG(5, ("Registering messaging pointer for type %u - "
697                   "private_data=%p\n",
698                   (unsigned)msg_type, private_data));
699
700         /*
701          * Only one callback per type
702          */
703
704         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
705                 /* we allow a second registration of the same message
706                    type if it has a different private pointer. This is
707                    needed in, for example, the internal notify code,
708                    which creates a new notify context for each tree
709                    connect, and expects to receive messages to each of
710                    them. */
711                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
712                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
713                                   (unsigned)msg_type, private_data));
714                         cb->fn = fn;
715                         cb->private_data = private_data;
716                         return NT_STATUS_OK;
717                 }
718         }
719
720         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
721                 return NT_STATUS_NO_MEMORY;
722         }
723
724         cb->msg_type = msg_type;
725         cb->fn = fn;
726         cb->private_data = private_data;
727
728         DLIST_ADD(msg_ctx->callbacks, cb);
729         return NT_STATUS_OK;
730 }
731
732 /*
733   De-register the function for a particular message type.
734 */
735 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
736                           void *private_data)
737 {
738         struct messaging_callback *cb, *next;
739
740         for (cb = ctx->callbacks; cb; cb = next) {
741                 next = cb->next;
742                 if ((cb->msg_type == msg_type)
743                     && (cb->private_data == private_data)) {
744                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
745                                   (unsigned)msg_type, private_data));
746                         DLIST_REMOVE(ctx->callbacks, cb);
747                         TALLOC_FREE(cb);
748                 }
749         }
750 }
751
752 /*
753   Send a message to a particular server
754 */
755 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
756                         struct server_id server, uint32_t msg_type,
757                         const DATA_BLOB *data)
758 {
759         struct iovec iov = {0};
760
761         if (data != NULL) {
762                 iov.iov_base = data->data;
763                 iov.iov_len = data->length;
764         };
765
766         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
767 }
768
769 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
770                             struct server_id server, uint32_t msg_type,
771                             const uint8_t *buf, size_t len)
772 {
773         DATA_BLOB blob = data_blob_const(buf, len);
774         return messaging_send(msg_ctx, server, msg_type, &blob);
775 }
776
777 static int messaging_post_self(struct messaging_context *msg_ctx,
778                                struct server_id src, struct server_id dst,
779                                uint32_t msg_type,
780                                const struct iovec *iov, int iovlen,
781                                const int *fds, size_t num_fds)
782 {
783         struct messaging_rec *rec;
784         bool ok;
785
786         rec = messaging_rec_create(
787                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
788         if (rec == NULL) {
789                 return ENOMEM;
790         }
791
792         ok = messaging_alert_event_contexts(msg_ctx);
793         if (!ok) {
794                 TALLOC_FREE(rec);
795                 return ENOMEM;
796         }
797
798         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
799
800         return 0;
801 }
802
803 int messaging_send_iov_from(struct messaging_context *msg_ctx,
804                             struct server_id src, struct server_id dst,
805                             uint32_t msg_type,
806                             const struct iovec *iov, int iovlen,
807                             const int *fds, size_t num_fds)
808 {
809         int ret;
810         uint8_t hdr[MESSAGE_HDR_LENGTH];
811         struct iovec iov2[iovlen+1];
812
813         if (server_id_is_disconnected(&dst)) {
814                 return EINVAL;
815         }
816
817         if (num_fds > INT8_MAX) {
818                 return EINVAL;
819         }
820
821         if (server_id_equal(&dst, &msg_ctx->id)) {
822                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
823                                           iov, iovlen, fds, num_fds);
824                 return ret;
825         }
826
827         message_hdr_put(hdr, msg_type, src, dst);
828         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
829         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
830
831         if (dst.vnn != msg_ctx->id.vnn) {
832                 if (num_fds > 0) {
833                         return ENOSYS;
834                 }
835
836                 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
837                 return ret;
838         }
839
840         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
841
842         if (ret == EACCES) {
843                 become_root();
844                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
845                                          fds, num_fds);
846                 unbecome_root();
847         }
848
849         if (ret == ECONNREFUSED) {
850                 /*
851                  * Linux returns this when a socket exists in the file
852                  * system without a listening process. This is not
853                  * documented in susv4 or the linux manpages, but it's
854                  * easily testable. For the higher levels this is the
855                  * same as "destination does not exist"
856                  */
857                 ret = ENOENT;
858         }
859
860         return ret;
861 }
862
863 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
864                             struct server_id dst, uint32_t msg_type,
865                             const struct iovec *iov, int iovlen,
866                             const int *fds, size_t num_fds)
867 {
868         int ret;
869
870         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
871                                       iov, iovlen, fds, num_fds);
872         if (ret != 0) {
873                 return map_nt_error_from_unix(ret);
874         }
875         return NT_STATUS_OK;
876 }
877
878 struct send_all_state {
879         struct messaging_context *msg_ctx;
880         int msg_type;
881         const void *buf;
882         size_t len;
883 };
884
885 static int send_all_fn(pid_t pid, void *private_data)
886 {
887         struct send_all_state *state = private_data;
888         NTSTATUS status;
889
890         if (pid == getpid()) {
891                 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
892                 return 0;
893         }
894
895         status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
896                                     state->msg_type, state->buf, state->len);
897         if (!NT_STATUS_IS_OK(status)) {
898                 DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
899                             (uintmax_t)pid, nt_errstr(status));
900         }
901
902         return 0;
903 }
904
905 void messaging_send_all(struct messaging_context *msg_ctx,
906                         int msg_type, const void *buf, size_t len)
907 {
908         struct send_all_state state = {
909                 .msg_ctx = msg_ctx, .msg_type = msg_type,
910                 .buf = buf, .len = len
911         };
912         int ret;
913
914 #ifdef CLUSTER_SUPPORT
915         if (lp_clustering()) {
916                 struct ctdbd_connection *conn = messaging_ctdb_connection();
917                 uint8_t msghdr[MESSAGE_HDR_LENGTH];
918                 struct iovec iov[] = {
919                         { .iov_base = msghdr,
920                           .iov_len = sizeof(msghdr) },
921                         { .iov_base = discard_const_p(void, buf),
922                           .iov_len = len }
923                 };
924
925                 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
926                                 (struct server_id) {0});
927
928                 ret = ctdbd_messaging_send_iov(
929                         conn, CTDB_BROADCAST_CONNECTED,
930                         CTDB_SRVID_SAMBA_PROCESS,
931                         iov, ARRAY_SIZE(iov));
932                 if (ret != 0) {
933                         DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
934                                     strerror(ret));
935                 }
936
937                 return;
938         }
939 #endif
940
941         ret = messaging_dgm_forall(send_all_fn, &state);
942         if (ret != 0) {
943                 DBG_WARNING("messaging_dgm_forall failed: %s\n",
944                             strerror(ret));
945         }
946 }
947
948 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
949                                                struct messaging_rec *rec)
950 {
951         struct messaging_rec *result;
952         size_t fds_size = sizeof(int64_t) * rec->num_fds;
953         size_t payload_len;
954
955         payload_len = rec->buf.length + fds_size;
956         if (payload_len < rec->buf.length) {
957                 /* overflow */
958                 return NULL;
959         }
960
961         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
962                                       payload_len);
963         if (result == NULL) {
964                 return NULL;
965         }
966         *result = *rec;
967
968         /* Doesn't fail, see talloc_pooled_object */
969
970         result->buf.data = talloc_memdup(result, rec->buf.data,
971                                          rec->buf.length);
972
973         result->fds = NULL;
974         if (result->num_fds > 0) {
975                 result->fds = talloc_memdup(result, rec->fds, fds_size);
976         }
977
978         return result;
979 }
980
981 struct messaging_filtered_read_state {
982         struct tevent_context *ev;
983         struct messaging_context *msg_ctx;
984         struct messaging_dgm_fde *fde;
985         struct messaging_ctdb_fde *cluster_fde;
986
987         bool (*filter)(struct messaging_rec *rec, void *private_data);
988         void *private_data;
989
990         struct messaging_rec *rec;
991 };
992
993 static void messaging_filtered_read_cleanup(struct tevent_req *req,
994                                             enum tevent_req_state req_state);
995
996 struct tevent_req *messaging_filtered_read_send(
997         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
998         struct messaging_context *msg_ctx,
999         bool (*filter)(struct messaging_rec *rec, void *private_data),
1000         void *private_data)
1001 {
1002         struct tevent_req *req;
1003         struct messaging_filtered_read_state *state;
1004         size_t new_waiters_len;
1005         bool ok;
1006
1007         req = tevent_req_create(mem_ctx, &state,
1008                                 struct messaging_filtered_read_state);
1009         if (req == NULL) {
1010                 return NULL;
1011         }
1012         state->ev = ev;
1013         state->msg_ctx = msg_ctx;
1014         state->filter = filter;
1015         state->private_data = private_data;
1016
1017         /*
1018          * We have to defer the callback here, as we might be called from
1019          * within a different tevent_context than state->ev
1020          */
1021         tevent_req_defer_callback(req, state->ev);
1022
1023         state->fde = messaging_dgm_register_tevent_context(state, ev);
1024         if (tevent_req_nomem(state->fde, req)) {
1025                 return tevent_req_post(req, ev);
1026         }
1027
1028         if (lp_clustering()) {
1029                 state->cluster_fde =
1030                         messaging_ctdb_register_tevent_context(state, ev);
1031                 if (tevent_req_nomem(state->cluster_fde, req)) {
1032                         return tevent_req_post(req, ev);
1033                 }
1034         }
1035
1036         /*
1037          * We add ourselves to the "new_waiters" array, not the "waiters"
1038          * array. If we are called from within messaging_read_done,
1039          * messaging_dispatch_rec will be in an active for-loop on
1040          * "waiters". We must be careful not to mess with this array, because
1041          * it could mean that a single event is being delivered twice.
1042          */
1043
1044         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1045
1046         if (new_waiters_len == msg_ctx->num_new_waiters) {
1047                 struct tevent_req **tmp;
1048
1049                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1050                                      struct tevent_req *, new_waiters_len+1);
1051                 if (tevent_req_nomem(tmp, req)) {
1052                         return tevent_req_post(req, ev);
1053                 }
1054                 msg_ctx->new_waiters = tmp;
1055         }
1056
1057         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1058         msg_ctx->num_new_waiters += 1;
1059         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1060
1061         ok = messaging_register_event_context(msg_ctx, ev);
1062         if (!ok) {
1063                 tevent_req_oom(req);
1064                 return tevent_req_post(req, ev);
1065         }
1066
1067         return req;
1068 }
1069
1070 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1071                                             enum tevent_req_state req_state)
1072 {
1073         struct messaging_filtered_read_state *state = tevent_req_data(
1074                 req, struct messaging_filtered_read_state);
1075         struct messaging_context *msg_ctx = state->msg_ctx;
1076         size_t i;
1077         bool ok;
1078
1079         tevent_req_set_cleanup_fn(req, NULL);
1080
1081         TALLOC_FREE(state->fde);
1082         TALLOC_FREE(state->cluster_fde);
1083
1084         ok = messaging_deregister_event_context(msg_ctx, state->ev);
1085         if (!ok) {
1086                 abort();
1087         }
1088
1089         /*
1090          * Just set the [new_]waiters entry to NULL, be careful not to mess
1091          * with the other "waiters" array contents. We are often called from
1092          * within "messaging_dispatch_rec", which loops over
1093          * "waiters". Messing with the "waiters" array will mess up that
1094          * for-loop.
1095          */
1096
1097         for (i=0; i<msg_ctx->num_waiters; i++) {
1098                 if (msg_ctx->waiters[i] == req) {
1099                         msg_ctx->waiters[i] = NULL;
1100                         return;
1101                 }
1102         }
1103
1104         for (i=0; i<msg_ctx->num_new_waiters; i++) {
1105                 if (msg_ctx->new_waiters[i] == req) {
1106                         msg_ctx->new_waiters[i] = NULL;
1107                         return;
1108                 }
1109         }
1110 }
1111
1112 static void messaging_filtered_read_done(struct tevent_req *req,
1113                                          struct messaging_rec *rec)
1114 {
1115         struct messaging_filtered_read_state *state = tevent_req_data(
1116                 req, struct messaging_filtered_read_state);
1117
1118         state->rec = messaging_rec_dup(state, rec);
1119         if (tevent_req_nomem(state->rec, req)) {
1120                 return;
1121         }
1122         tevent_req_done(req);
1123 }
1124
1125 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1126                                  struct messaging_rec **presult)
1127 {
1128         struct messaging_filtered_read_state *state = tevent_req_data(
1129                 req, struct messaging_filtered_read_state);
1130         int err;
1131
1132         if (tevent_req_is_unix_error(req, &err)) {
1133                 tevent_req_received(req);
1134                 return err;
1135         }
1136         if (presult != NULL) {
1137                 *presult = talloc_move(mem_ctx, &state->rec);
1138         }
1139         return 0;
1140 }
1141
1142 struct messaging_read_state {
1143         uint32_t msg_type;
1144         struct messaging_rec *rec;
1145 };
1146
1147 static bool messaging_read_filter(struct messaging_rec *rec,
1148                                   void *private_data);
1149 static void messaging_read_done(struct tevent_req *subreq);
1150
1151 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1152                                        struct tevent_context *ev,
1153                                        struct messaging_context *msg,
1154                                        uint32_t msg_type)
1155 {
1156         struct tevent_req *req, *subreq;
1157         struct messaging_read_state *state;
1158
1159         req = tevent_req_create(mem_ctx, &state,
1160                                 struct messaging_read_state);
1161         if (req == NULL) {
1162                 return NULL;
1163         }
1164         state->msg_type = msg_type;
1165
1166         subreq = messaging_filtered_read_send(state, ev, msg,
1167                                               messaging_read_filter, state);
1168         if (tevent_req_nomem(subreq, req)) {
1169                 return tevent_req_post(req, ev);
1170         }
1171         tevent_req_set_callback(subreq, messaging_read_done, req);
1172         return req;
1173 }
1174
1175 static bool messaging_read_filter(struct messaging_rec *rec,
1176                                   void *private_data)
1177 {
1178         struct messaging_read_state *state = talloc_get_type_abort(
1179                 private_data, struct messaging_read_state);
1180
1181         if (rec->num_fds != 0) {
1182                 return false;
1183         }
1184
1185         return rec->msg_type == state->msg_type;
1186 }
1187
1188 static void messaging_read_done(struct tevent_req *subreq)
1189 {
1190         struct tevent_req *req = tevent_req_callback_data(
1191                 subreq, struct tevent_req);
1192         struct messaging_read_state *state = tevent_req_data(
1193                 req, struct messaging_read_state);
1194         int ret;
1195
1196         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1197         TALLOC_FREE(subreq);
1198         if (tevent_req_error(req, ret)) {
1199                 return;
1200         }
1201         tevent_req_done(req);
1202 }
1203
1204 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1205                         struct messaging_rec **presult)
1206 {
1207         struct messaging_read_state *state = tevent_req_data(
1208                 req, struct messaging_read_state);
1209         int err;
1210
1211         if (tevent_req_is_unix_error(req, &err)) {
1212                 return err;
1213         }
1214         if (presult != NULL) {
1215                 *presult = talloc_move(mem_ctx, &state->rec);
1216         }
1217         return 0;
1218 }
1219
1220 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1221 {
1222         if (msg_ctx->num_new_waiters == 0) {
1223                 return true;
1224         }
1225
1226         if (talloc_array_length(msg_ctx->waiters) <
1227             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1228                 struct tevent_req **tmp;
1229                 tmp = talloc_realloc(
1230                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1231                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1232                 if (tmp == NULL) {
1233                         DEBUG(1, ("%s: talloc failed\n", __func__));
1234                         return false;
1235                 }
1236                 msg_ctx->waiters = tmp;
1237         }
1238
1239         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1240                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1241
1242         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1243         msg_ctx->num_new_waiters = 0;
1244
1245         return true;
1246 }
1247
1248 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1249                                        struct messaging_rec *rec)
1250 {
1251         struct messaging_callback *cb, *next;
1252
1253         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1254                 size_t j;
1255
1256                 next = cb->next;
1257                 if (cb->msg_type != rec->msg_type) {
1258                         continue;
1259                 }
1260
1261                 /*
1262                  * the old style callbacks don't support fd passing
1263                  */
1264                 for (j=0; j < rec->num_fds; j++) {
1265                         int fd = rec->fds[j];
1266                         close(fd);
1267                 }
1268                 rec->num_fds = 0;
1269                 rec->fds = NULL;
1270
1271                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1272                        rec->src, &rec->buf);
1273
1274                 return true;
1275         }
1276
1277         return false;
1278 }
1279
1280 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1281                                        struct tevent_context *ev,
1282                                        struct messaging_rec *rec)
1283 {
1284         size_t i;
1285
1286         if (!messaging_append_new_waiters(msg_ctx)) {
1287                 return false;
1288         }
1289
1290         i = 0;
1291         while (i < msg_ctx->num_waiters) {
1292                 struct tevent_req *req;
1293                 struct messaging_filtered_read_state *state;
1294
1295                 req = msg_ctx->waiters[i];
1296                 if (req == NULL) {
1297                         /*
1298                          * This got cleaned up. In the meantime,
1299                          * move everything down one. We need
1300                          * to keep the order of waiters, as
1301                          * other code may depend on this.
1302                          */
1303                         if (i < msg_ctx->num_waiters - 1) {
1304                                 memmove(&msg_ctx->waiters[i],
1305                                         &msg_ctx->waiters[i+1],
1306                                         sizeof(struct tevent_req *) *
1307                                             (msg_ctx->num_waiters - i - 1));
1308                         }
1309                         msg_ctx->num_waiters -= 1;
1310                         continue;
1311                 }
1312
1313                 state = tevent_req_data(
1314                         req, struct messaging_filtered_read_state);
1315                 if ((ev == state->ev) &&
1316                     state->filter(rec, state->private_data)) {
1317                         messaging_filtered_read_done(req, rec);
1318                         return true;
1319                 }
1320
1321                 i += 1;
1322         }
1323
1324         return false;
1325 }
1326
1327 /*
1328   Dispatch one messaging_rec
1329 */
1330 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1331                                    struct tevent_context *ev,
1332                                    struct messaging_rec *rec)
1333 {
1334         bool consumed;
1335         size_t i;
1336
1337         if (ev == msg_ctx->event_ctx) {
1338                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1339                 if (consumed) {
1340                         return;
1341                 }
1342         }
1343
1344         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1345         if (consumed) {
1346                 return;
1347         }
1348
1349         if (ev != msg_ctx->event_ctx) {
1350                 struct iovec iov;
1351                 int fds[rec->num_fds];
1352                 int ret;
1353
1354                 /*
1355                  * We've been listening on a nested event
1356                  * context. Messages need to be handled in the main
1357                  * event context, so post to ourselves
1358                  */
1359
1360                 iov.iov_base = rec->buf.data;
1361                 iov.iov_len = rec->buf.length;
1362
1363                 for (i=0; i<rec->num_fds; i++) {
1364                         fds[i] = rec->fds[i];
1365                 }
1366
1367                 ret = messaging_post_self(
1368                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1369                         &iov, 1, fds, rec->num_fds);
1370                 if (ret == 0) {
1371                         return;
1372                 }
1373         }
1374
1375         /*
1376          * If the fd-array isn't used, just close it.
1377          */
1378         for (i=0; i < rec->num_fds; i++) {
1379                 int fd = rec->fds[i];
1380                 close(fd);
1381         }
1382         rec->num_fds = 0;
1383         rec->fds = NULL;
1384 }
1385
1386 static int mess_parent_dgm_cleanup(void *private_data);
1387 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1388
1389 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1390 {
1391         struct tevent_req *req;
1392
1393         req = background_job_send(
1394                 msg, msg->event_ctx, msg, NULL, 0,
1395                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1396                             60*15),
1397                 mess_parent_dgm_cleanup, msg);
1398         if (req == NULL) {
1399                 DBG_WARNING("background_job_send failed\n");
1400                 return false;
1401         }
1402         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1403         return true;
1404 }
1405
1406 static int mess_parent_dgm_cleanup(void *private_data)
1407 {
1408         int ret;
1409
1410         ret = messaging_dgm_wipe();
1411         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1412                    ret ? strerror(ret) : "ok"));
1413         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1414                            60*15);
1415 }
1416
1417 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1418 {
1419         struct messaging_context *msg = tevent_req_callback_data(
1420                 req, struct messaging_context);
1421         NTSTATUS status;
1422
1423         status = background_job_recv(req);
1424         TALLOC_FREE(req);
1425         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1426                   nt_errstr(status)));
1427
1428         req = background_job_send(
1429                 msg, msg->event_ctx, msg, NULL, 0,
1430                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1431                             60*15),
1432                 mess_parent_dgm_cleanup, msg);
1433         if (req == NULL) {
1434                 DEBUG(1, ("background_job_send failed\n"));
1435                 return;
1436         }
1437         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1438 }
1439
1440 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1441 {
1442         int ret;
1443
1444         if (pid == 0) {
1445                 ret = messaging_dgm_wipe();
1446         } else {
1447                 ret = messaging_dgm_cleanup(pid);
1448         }
1449
1450         return ret;
1451 }
1452
1453 struct tevent_context *messaging_tevent_context(
1454         struct messaging_context *msg_ctx)
1455 {
1456         return msg_ctx->event_ctx;
1457 }
1458
1459 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1460 {
1461         return msg_ctx->names_db;
1462 }
1463
1464 /** @} **/