ldb: Build lmdb backend also in non-AD case
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messaging/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messaging/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
65 #include "source3/lib/tallocmsg.h"
66
67 #ifdef CLUSTER_SUPPORT
68 #include "ctdb_protocol.h"
69 #endif
70
71 struct messaging_callback {
72         struct messaging_callback *prev, *next;
73         uint32_t msg_type;
74         void (*fn)(struct messaging_context *msg, void *private_data, 
75                    uint32_t msg_type, 
76                    struct server_id server_id, DATA_BLOB *data);
77         void *private_data;
78 };
79
80 struct messaging_registered_ev {
81         struct tevent_context *ev;
82         struct tevent_immediate *im;
83         size_t refcount;
84 };
85
86 struct messaging_context {
87         struct server_id id;
88         struct tevent_context *event_ctx;
89         struct messaging_callback *callbacks;
90
91         struct messaging_rec *posted_msgs;
92
93         struct messaging_registered_ev *event_contexts;
94
95         struct tevent_req **new_waiters;
96         size_t num_new_waiters;
97
98         struct tevent_req **waiters;
99         size_t num_waiters;
100
101         struct server_id_db *names_db;
102
103         TALLOC_CTX *per_process_talloc_ctx;
104 };
105
106 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107                                                struct messaging_rec *rec);
108 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109                                        struct messaging_rec *rec);
110 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111                                        struct tevent_context *ev,
112                                        struct messaging_rec *rec);
113 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114                                    struct tevent_context *ev,
115                                    struct messaging_rec *rec);
116
117 /****************************************************************************
118  A useful function for testing the message system.
119 ****************************************************************************/
120
121 static void ping_message(struct messaging_context *msg_ctx,
122                          void *private_data,
123                          uint32_t msg_type,
124                          struct server_id src,
125                          DATA_BLOB *data)
126 {
127         struct server_id_buf idbuf;
128
129         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130                   server_id_str_buf(src, &idbuf), (int)data->length,
131                   data->data ? (char *)data->data : ""));
132
133         messaging_send(msg_ctx, src, MSG_PONG, data);
134 }
135
136 struct messaging_rec *messaging_rec_create(
137         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138         uint32_t msg_type, const struct iovec *iov, int iovlen,
139         const int *fds, size_t num_fds)
140 {
141         ssize_t buflen;
142         uint8_t *buf;
143         struct messaging_rec *result;
144
145         if (num_fds > INT8_MAX) {
146                 return NULL;
147         }
148
149         buflen = iov_buflen(iov, iovlen);
150         if (buflen == -1) {
151                 return NULL;
152         }
153         buf = talloc_array(mem_ctx, uint8_t, buflen);
154         if (buf == NULL) {
155                 return NULL;
156         }
157         iov_buf(iov, iovlen, buf, buflen);
158
159         {
160                 struct messaging_rec rec;
161                 int64_t fds64[MAX(1, num_fds)];
162                 size_t i;
163
164                 for (i=0; i<num_fds; i++) {
165                         fds64[i] = fds[i];
166                 }
167
168                 rec = (struct messaging_rec) {
169                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170                         .src = src, .dest = dst,
171                         .buf.data = buf, .buf.length = buflen,
172                         .num_fds = num_fds, .fds = fds64,
173                 };
174
175                 result = messaging_rec_dup(mem_ctx, &rec);
176         }
177
178         TALLOC_FREE(buf);
179
180         return result;
181 }
182
183 static bool messaging_register_event_context(struct messaging_context *ctx,
184                                              struct tevent_context *ev)
185 {
186         size_t i, num_event_contexts;
187         struct messaging_registered_ev *free_reg = NULL;
188         struct messaging_registered_ev *tmp;
189
190         num_event_contexts = talloc_array_length(ctx->event_contexts);
191
192         for (i=0; i<num_event_contexts; i++) {
193                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
194
195                 if (reg->refcount == 0) {
196                         if (reg->ev != NULL) {
197                                 abort();
198                         }
199                         free_reg = reg;
200                         /*
201                          * We continue here and may find another
202                          * free_req, but the important thing is
203                          * that we continue to search for an
204                          * existing registration in the loop.
205                          */
206                         continue;
207                 }
208
209                 if (reg->ev == ev) {
210                         reg->refcount += 1;
211                         return true;
212                 }
213         }
214
215         if (free_reg == NULL) {
216                 struct tevent_immediate *im = NULL;
217
218                 im = tevent_create_immediate(ctx);
219                 if (im == NULL) {
220                         return false;
221                 }
222
223                 tmp = talloc_realloc(ctx, ctx->event_contexts,
224                                      struct messaging_registered_ev,
225                                      num_event_contexts+1);
226                 if (tmp == NULL) {
227                         return false;
228                 }
229                 ctx->event_contexts = tmp;
230
231                 free_reg = &ctx->event_contexts[num_event_contexts];
232                 free_reg->im = talloc_move(ctx->event_contexts, &im);
233         }
234
235         /*
236          * free_reg->im might be cached
237          */
238         free_reg->ev = ev;
239         free_reg->refcount = 1;
240
241         return true;
242 }
243
244 static bool messaging_deregister_event_context(struct messaging_context *ctx,
245                                                struct tevent_context *ev)
246 {
247         size_t i, num_event_contexts;
248
249         num_event_contexts = talloc_array_length(ctx->event_contexts);
250
251         for (i=0; i<num_event_contexts; i++) {
252                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
253
254                 if (reg->refcount == 0) {
255                         continue;
256                 }
257
258                 if (reg->ev == ev) {
259                         reg->refcount -= 1;
260
261                         if (reg->refcount == 0) {
262                                 /*
263                                  * The primary event context
264                                  * is never unregistered using
265                                  * messaging_deregister_event_context()
266                                  * it's only registered using
267                                  * messaging_register_event_context().
268                                  */
269                                 SMB_ASSERT(ev != ctx->event_ctx);
270                                 SMB_ASSERT(reg->ev != ctx->event_ctx);
271
272                                 /*
273                                  * Not strictly necessary, just
274                                  * paranoia
275                                  */
276                                 reg->ev = NULL;
277
278                                 /*
279                                  * Do not talloc_free(reg->im),
280                                  * recycle immediates events.
281                                  *
282                                  * We just invalidate it using
283                                  * the primary event context,
284                                  * which is never unregistered.
285                                  */
286                                 tevent_schedule_immediate(reg->im,
287                                                           ctx->event_ctx,
288                                                           NULL, NULL);
289                         }
290                         return true;
291                 }
292         }
293         return false;
294 }
295
296 static void messaging_post_main_event_context(struct tevent_context *ev,
297                                               struct tevent_immediate *im,
298                                               void *private_data)
299 {
300         struct messaging_context *ctx = talloc_get_type_abort(
301                 private_data, struct messaging_context);
302
303         while (ctx->posted_msgs != NULL) {
304                 struct messaging_rec *rec = ctx->posted_msgs;
305                 bool consumed;
306
307                 DLIST_REMOVE(ctx->posted_msgs, rec);
308
309                 consumed = messaging_dispatch_classic(ctx, rec);
310                 if (!consumed) {
311                         consumed = messaging_dispatch_waiters(
312                                 ctx, ctx->event_ctx, rec);
313                 }
314
315                 if (!consumed) {
316                         uint8_t i;
317
318                         for (i=0; i<rec->num_fds; i++) {
319                                 close(rec->fds[i]);
320                         }
321                 }
322
323                 TALLOC_FREE(rec);
324         }
325 }
326
327 static void messaging_post_sub_event_context(struct tevent_context *ev,
328                                              struct tevent_immediate *im,
329                                              void *private_data)
330 {
331         struct messaging_context *ctx = talloc_get_type_abort(
332                 private_data, struct messaging_context);
333         struct messaging_rec *rec, *next;
334
335         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
336                 bool consumed;
337
338                 next = rec->next;
339
340                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
341                 if (consumed) {
342                         DLIST_REMOVE(ctx->posted_msgs, rec);
343                         TALLOC_FREE(rec);
344                 }
345         }
346 }
347
348 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
349 {
350         size_t i, num_event_contexts;
351
352         num_event_contexts = talloc_array_length(ctx->event_contexts);
353
354         for (i=0; i<num_event_contexts; i++) {
355                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
356
357                 if (reg->refcount == 0) {
358                         continue;
359                 }
360
361                 /*
362                  * We depend on schedule_immediate to work
363                  * multiple times. Might be a bit inefficient,
364                  * but this needs to be proven in tests. The
365                  * alternatively would be to track whether the
366                  * immediate has already been scheduled. For
367                  * now, avoid that complexity here.
368                  */
369
370                 if (reg->ev == ctx->event_ctx) {
371                         tevent_schedule_immediate(
372                                 reg->im, reg->ev,
373                                 messaging_post_main_event_context,
374                                 ctx);
375                 } else {
376                         tevent_schedule_immediate(
377                                 reg->im, reg->ev,
378                                 messaging_post_sub_event_context,
379                                 ctx);
380                 }
381
382         }
383         return true;
384 }
385
386 static void messaging_recv_cb(struct tevent_context *ev,
387                               const uint8_t *msg, size_t msg_len,
388                               int *fds, size_t num_fds,
389                               void *private_data)
390 {
391         struct messaging_context *msg_ctx = talloc_get_type_abort(
392                 private_data, struct messaging_context);
393         struct server_id_buf idbuf;
394         struct messaging_rec rec;
395         int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
396         size_t i;
397
398         if (msg_len < MESSAGE_HDR_LENGTH) {
399                 DBG_WARNING("message too short: %zu\n", msg_len);
400                 return;
401         }
402
403         if (num_fds > INT8_MAX) {
404                 DBG_WARNING("too many fds: %zu\n", num_fds);
405                 return;
406         }
407
408         for (i=0; i < num_fds; i++) {
409                 fds64[i] = fds[i];
410         }
411
412         rec = (struct messaging_rec) {
413                 .msg_version = MESSAGE_VERSION,
414                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
415                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
416                 .num_fds = num_fds,
417                 .fds = fds64,
418         };
419
420         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
421
422         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
423                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
424                   server_id_str_buf(rec.src, &idbuf));
425
426         if (server_id_same_process(&rec.src, &msg_ctx->id)) {
427                 DBG_DEBUG("Ignoring self-send\n");
428                 return;
429         }
430
431         messaging_dispatch_rec(msg_ctx, ev, &rec);
432
433         for (i=0; i<num_fds; i++) {
434                 fds[i] = fds64[i];
435         }
436 }
437
438 static int messaging_context_destructor(struct messaging_context *ctx)
439 {
440         size_t i;
441
442         for (i=0; i<ctx->num_new_waiters; i++) {
443                 if (ctx->new_waiters[i] != NULL) {
444                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
445                         ctx->new_waiters[i] = NULL;
446                 }
447         }
448         for (i=0; i<ctx->num_waiters; i++) {
449                 if (ctx->waiters[i] != NULL) {
450                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
451                         ctx->waiters[i] = NULL;
452                 }
453         }
454
455         /*
456          * The immediates from messaging_alert_event_contexts
457          * reference "ctx". Don't let them outlive the
458          * messaging_context we're destroying here.
459          */
460         TALLOC_FREE(ctx->event_contexts);
461
462         return 0;
463 }
464
465 static const char *private_path(const char *name)
466 {
467         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
468 }
469
470 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
471                                         struct tevent_context *ev,
472                                         struct messaging_context **pmsg_ctx)
473 {
474         TALLOC_CTX *frame;
475         struct messaging_context *ctx;
476         NTSTATUS status;
477         int ret;
478         const char *lck_path;
479         const char *priv_path;
480         void *ref;
481         bool ok;
482
483         /*
484          * sec_init() *must* be called before any other
485          * functions that use sec_XXX(). e.g. sec_initial_uid().
486          */
487
488         sec_init();
489
490         lck_path = lock_path(talloc_tos(), "msg.lock");
491         if (lck_path == NULL) {
492                 return NT_STATUS_NO_MEMORY;
493         }
494
495         ok = directory_create_or_exist_strict(lck_path,
496                                               sec_initial_uid(),
497                                               0755);
498         if (!ok) {
499                 DBG_DEBUG("Could not create lock directory: %s\n",
500                           strerror(errno));
501                 return NT_STATUS_ACCESS_DENIED;
502         }
503
504         priv_path = private_path("msg.sock");
505         if (priv_path == NULL) {
506                 return NT_STATUS_NO_MEMORY;
507         }
508
509         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
510                                               0700);
511         if (!ok) {
512                 DBG_DEBUG("Could not create msg directory: %s\n",
513                           strerror(errno));
514                 return NT_STATUS_ACCESS_DENIED;
515         }
516
517         frame = talloc_stackframe();
518         if (frame == NULL) {
519                 return NT_STATUS_NO_MEMORY;
520         }
521
522         ctx = talloc_zero(frame, struct messaging_context);
523         if (ctx == NULL) {
524                 status = NT_STATUS_NO_MEMORY;
525                 goto done;
526         }
527
528         ctx->id = (struct server_id) {
529                 .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
530         };
531
532         ctx->event_ctx = ev;
533
534         ctx->per_process_talloc_ctx = talloc_new(ctx);
535         if (ctx->per_process_talloc_ctx == NULL) {
536                 status = NT_STATUS_NO_MEMORY;
537                 goto done;
538         }
539
540         ok = messaging_register_event_context(ctx, ev);
541         if (!ok) {
542                 status = NT_STATUS_NO_MEMORY;
543                 goto done;
544         }
545
546         ref = messaging_dgm_ref(
547                 ctx->per_process_talloc_ctx,
548                 ctx->event_ctx,
549                 &ctx->id.unique_id,
550                 priv_path,
551                 lck_path,
552                 messaging_recv_cb,
553                 ctx,
554                 &ret);
555         if (ref == NULL) {
556                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
557                 status = map_nt_error_from_unix(ret);
558                 goto done;
559         }
560         talloc_set_destructor(ctx, messaging_context_destructor);
561
562 #ifdef CLUSTER_SUPPORT
563         if (lp_clustering()) {
564                 ref = messaging_ctdb_ref(
565                         ctx->per_process_talloc_ctx,
566                         ctx->event_ctx,
567                         lp_ctdbd_socket(),
568                         lp_ctdb_timeout(),
569                         ctx->id.unique_id,
570                         messaging_recv_cb,
571                         ctx,
572                         &ret);
573                 if (ref == NULL) {
574                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
575                                    strerror(ret));
576                         status = map_nt_error_from_unix(ret);
577                         goto done;
578                 }
579         }
580 #endif
581
582         ctx->id.vnn = get_my_vnn();
583
584         ctx->names_db = server_id_db_init(ctx,
585                                           ctx->id,
586                                           lp_lock_directory(),
587                                           0,
588                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
589         if (ctx->names_db == NULL) {
590                 DBG_DEBUG("server_id_db_init failed\n");
591                 status = NT_STATUS_NO_MEMORY;
592                 goto done;
593         }
594
595         messaging_register(ctx, NULL, MSG_PING, ping_message);
596
597         /* Register some debugging related messages */
598
599         register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
600         register_dmalloc_msgs(ctx);
601         debug_register_msgs(ctx);
602
603         {
604                 struct server_id_buf tmp;
605                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
606         }
607
608         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
609
610         status = NT_STATUS_OK;
611 done:
612         TALLOC_FREE(frame);
613
614         return status;
615 }
616
617 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
618                                          struct tevent_context *ev)
619 {
620         struct messaging_context *ctx = NULL;
621         NTSTATUS status;
622
623         status = messaging_init_internal(mem_ctx,
624                                          ev,
625                                          &ctx);
626         if (!NT_STATUS_IS_OK(status)) {
627                 return NULL;
628         }
629
630         return ctx;
631 }
632
633 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
634 {
635         return msg_ctx->id;
636 }
637
638 /*
639  * re-init after a fork
640  */
641 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
642 {
643         int ret;
644         char *lck_path;
645         void *ref;
646
647         TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
648
649         msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
650         if (msg_ctx->per_process_talloc_ctx == NULL) {
651                 return NT_STATUS_NO_MEMORY;
652         }
653
654         msg_ctx->id = (struct server_id) {
655                 .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
656         };
657
658         lck_path = lock_path(talloc_tos(), "msg.lock");
659         if (lck_path == NULL) {
660                 return NT_STATUS_NO_MEMORY;
661         }
662
663         ref = messaging_dgm_ref(
664                 msg_ctx->per_process_talloc_ctx,
665                 msg_ctx->event_ctx,
666                 &msg_ctx->id.unique_id,
667                 private_path("msg.sock"),
668                 lck_path,
669                 messaging_recv_cb,
670                 msg_ctx,
671                 &ret);
672
673         if (ref == NULL) {
674                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
675                 return map_nt_error_from_unix(ret);
676         }
677
678         if (lp_clustering()) {
679                 ref = messaging_ctdb_ref(
680                         msg_ctx->per_process_talloc_ctx,
681                         msg_ctx->event_ctx,
682                         lp_ctdbd_socket(),
683                         lp_ctdb_timeout(),
684                         msg_ctx->id.unique_id,
685                         messaging_recv_cb,
686                         msg_ctx,
687                         &ret);
688                 if (ref == NULL) {
689                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
690                                    strerror(ret));
691                         return map_nt_error_from_unix(ret);
692                 }
693         }
694
695         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
696         register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
697
698         return NT_STATUS_OK;
699 }
700
701
702 /*
703  * Register a dispatch function for a particular message type. Allow multiple
704  * registrants
705 */
706 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
707                             void *private_data,
708                             uint32_t msg_type,
709                             void (*fn)(struct messaging_context *msg,
710                                        void *private_data, 
711                                        uint32_t msg_type, 
712                                        struct server_id server_id,
713                                        DATA_BLOB *data))
714 {
715         struct messaging_callback *cb;
716
717         DEBUG(5, ("Registering messaging pointer for type %u - "
718                   "private_data=%p\n",
719                   (unsigned)msg_type, private_data));
720
721         /*
722          * Only one callback per type
723          */
724
725         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
726                 /* we allow a second registration of the same message
727                    type if it has a different private pointer. This is
728                    needed in, for example, the internal notify code,
729                    which creates a new notify context for each tree
730                    connect, and expects to receive messages to each of
731                    them. */
732                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
733                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
734                                   (unsigned)msg_type, private_data));
735                         cb->fn = fn;
736                         cb->private_data = private_data;
737                         return NT_STATUS_OK;
738                 }
739         }
740
741         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
742                 return NT_STATUS_NO_MEMORY;
743         }
744
745         cb->msg_type = msg_type;
746         cb->fn = fn;
747         cb->private_data = private_data;
748
749         DLIST_ADD(msg_ctx->callbacks, cb);
750         return NT_STATUS_OK;
751 }
752
753 /*
754   De-register the function for a particular message type.
755 */
756 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
757                           void *private_data)
758 {
759         struct messaging_callback *cb, *next;
760
761         for (cb = ctx->callbacks; cb; cb = next) {
762                 next = cb->next;
763                 if ((cb->msg_type == msg_type)
764                     && (cb->private_data == private_data)) {
765                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
766                                   (unsigned)msg_type, private_data));
767                         DLIST_REMOVE(ctx->callbacks, cb);
768                         TALLOC_FREE(cb);
769                 }
770         }
771 }
772
773 /*
774   Send a message to a particular server
775 */
776 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
777                         struct server_id server, uint32_t msg_type,
778                         const DATA_BLOB *data)
779 {
780         struct iovec iov = {0};
781
782         if (data != NULL) {
783                 iov.iov_base = data->data;
784                 iov.iov_len = data->length;
785         };
786
787         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
788 }
789
790 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
791                             struct server_id server, uint32_t msg_type,
792                             const uint8_t *buf, size_t len)
793 {
794         DATA_BLOB blob = data_blob_const(buf, len);
795         return messaging_send(msg_ctx, server, msg_type, &blob);
796 }
797
798 static int messaging_post_self(struct messaging_context *msg_ctx,
799                                struct server_id src, struct server_id dst,
800                                uint32_t msg_type,
801                                const struct iovec *iov, int iovlen,
802                                const int *fds, size_t num_fds)
803 {
804         struct messaging_rec *rec;
805         bool ok;
806
807         rec = messaging_rec_create(
808                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
809         if (rec == NULL) {
810                 return ENOMEM;
811         }
812
813         ok = messaging_alert_event_contexts(msg_ctx);
814         if (!ok) {
815                 TALLOC_FREE(rec);
816                 return ENOMEM;
817         }
818
819         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
820
821         return 0;
822 }
823
824 int messaging_send_iov_from(struct messaging_context *msg_ctx,
825                             struct server_id src, struct server_id dst,
826                             uint32_t msg_type,
827                             const struct iovec *iov, int iovlen,
828                             const int *fds, size_t num_fds)
829 {
830         int ret;
831         uint8_t hdr[MESSAGE_HDR_LENGTH];
832         struct iovec iov2[iovlen+1];
833
834         if (server_id_is_disconnected(&dst)) {
835                 return EINVAL;
836         }
837
838         if (num_fds > INT8_MAX) {
839                 return EINVAL;
840         }
841
842         if (server_id_equal(&dst, &msg_ctx->id)) {
843                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
844                                           iov, iovlen, fds, num_fds);
845                 return ret;
846         }
847
848         message_hdr_put(hdr, msg_type, src, dst);
849         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
850         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
851
852         if (dst.vnn != msg_ctx->id.vnn) {
853                 if (num_fds > 0) {
854                         return ENOSYS;
855                 }
856
857                 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
858                 return ret;
859         }
860
861         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
862
863         if (ret == EACCES) {
864                 become_root();
865                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
866                                          fds, num_fds);
867                 unbecome_root();
868         }
869
870         if (ret == ECONNREFUSED) {
871                 /*
872                  * Linux returns this when a socket exists in the file
873                  * system without a listening process. This is not
874                  * documented in susv4 or the linux manpages, but it's
875                  * easily testable. For the higher levels this is the
876                  * same as "destination does not exist"
877                  */
878                 ret = ENOENT;
879         }
880
881         return ret;
882 }
883
884 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
885                             struct server_id dst, uint32_t msg_type,
886                             const struct iovec *iov, int iovlen,
887                             const int *fds, size_t num_fds)
888 {
889         int ret;
890
891         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
892                                       iov, iovlen, fds, num_fds);
893         if (ret != 0) {
894                 return map_nt_error_from_unix(ret);
895         }
896         return NT_STATUS_OK;
897 }
898
899 struct send_all_state {
900         struct messaging_context *msg_ctx;
901         int msg_type;
902         const void *buf;
903         size_t len;
904 };
905
906 static int send_all_fn(pid_t pid, void *private_data)
907 {
908         struct send_all_state *state = private_data;
909         NTSTATUS status;
910
911         if (pid == tevent_cached_getpid()) {
912                 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
913                 return 0;
914         }
915
916         status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
917                                     state->msg_type, state->buf, state->len);
918         if (!NT_STATUS_IS_OK(status)) {
919                 DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
920                             (uintmax_t)pid, nt_errstr(status));
921         }
922
923         return 0;
924 }
925
926 void messaging_send_all(struct messaging_context *msg_ctx,
927                         int msg_type, const void *buf, size_t len)
928 {
929         struct send_all_state state = {
930                 .msg_ctx = msg_ctx, .msg_type = msg_type,
931                 .buf = buf, .len = len
932         };
933         int ret;
934
935 #ifdef CLUSTER_SUPPORT
936         if (lp_clustering()) {
937                 struct ctdbd_connection *conn = messaging_ctdb_connection();
938                 uint8_t msghdr[MESSAGE_HDR_LENGTH];
939                 struct iovec iov[] = {
940                         { .iov_base = msghdr,
941                           .iov_len = sizeof(msghdr) },
942                         { .iov_base = discard_const_p(void, buf),
943                           .iov_len = len }
944                 };
945
946                 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
947                                 (struct server_id) {0});
948
949                 ret = ctdbd_messaging_send_iov(
950                         conn, CTDB_BROADCAST_CONNECTED,
951                         CTDB_SRVID_SAMBA_PROCESS,
952                         iov, ARRAY_SIZE(iov));
953                 if (ret != 0) {
954                         DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
955                                     strerror(ret));
956                 }
957
958                 return;
959         }
960 #endif
961
962         ret = messaging_dgm_forall(send_all_fn, &state);
963         if (ret != 0) {
964                 DBG_WARNING("messaging_dgm_forall failed: %s\n",
965                             strerror(ret));
966         }
967 }
968
969 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
970                                                struct messaging_rec *rec)
971 {
972         struct messaging_rec *result;
973         size_t fds_size = sizeof(int64_t) * rec->num_fds;
974         size_t payload_len;
975
976         payload_len = rec->buf.length + fds_size;
977         if (payload_len < rec->buf.length) {
978                 /* overflow */
979                 return NULL;
980         }
981
982         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
983                                       payload_len);
984         if (result == NULL) {
985                 return NULL;
986         }
987         *result = *rec;
988
989         /* Doesn't fail, see talloc_pooled_object */
990
991         result->buf.data = talloc_memdup(result, rec->buf.data,
992                                          rec->buf.length);
993
994         result->fds = NULL;
995         if (result->num_fds > 0) {
996                 size_t i;
997
998                 result->fds = talloc_memdup(result, rec->fds, fds_size);
999
1000                 for (i=0; i<rec->num_fds; i++) {
1001                         /*
1002                          * fd's can only exist once
1003                          */
1004                         rec->fds[i] = -1;
1005                 }
1006         }
1007
1008         return result;
1009 }
1010
1011 struct messaging_filtered_read_state {
1012         struct tevent_context *ev;
1013         struct messaging_context *msg_ctx;
1014         struct messaging_dgm_fde *fde;
1015         struct messaging_ctdb_fde *cluster_fde;
1016
1017         bool (*filter)(struct messaging_rec *rec, void *private_data);
1018         void *private_data;
1019
1020         struct messaging_rec *rec;
1021 };
1022
1023 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1024                                             enum tevent_req_state req_state);
1025
1026 struct tevent_req *messaging_filtered_read_send(
1027         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1028         struct messaging_context *msg_ctx,
1029         bool (*filter)(struct messaging_rec *rec, void *private_data),
1030         void *private_data)
1031 {
1032         struct tevent_req *req;
1033         struct messaging_filtered_read_state *state;
1034         size_t new_waiters_len;
1035         bool ok;
1036
1037         req = tevent_req_create(mem_ctx, &state,
1038                                 struct messaging_filtered_read_state);
1039         if (req == NULL) {
1040                 return NULL;
1041         }
1042         state->ev = ev;
1043         state->msg_ctx = msg_ctx;
1044         state->filter = filter;
1045         state->private_data = private_data;
1046
1047         /*
1048          * We have to defer the callback here, as we might be called from
1049          * within a different tevent_context than state->ev
1050          */
1051         tevent_req_defer_callback(req, state->ev);
1052
1053         state->fde = messaging_dgm_register_tevent_context(state, ev);
1054         if (tevent_req_nomem(state->fde, req)) {
1055                 return tevent_req_post(req, ev);
1056         }
1057
1058         if (lp_clustering()) {
1059                 state->cluster_fde =
1060                         messaging_ctdb_register_tevent_context(state, ev);
1061                 if (tevent_req_nomem(state->cluster_fde, req)) {
1062                         return tevent_req_post(req, ev);
1063                 }
1064         }
1065
1066         /*
1067          * We add ourselves to the "new_waiters" array, not the "waiters"
1068          * array. If we are called from within messaging_read_done,
1069          * messaging_dispatch_rec will be in an active for-loop on
1070          * "waiters". We must be careful not to mess with this array, because
1071          * it could mean that a single event is being delivered twice.
1072          */
1073
1074         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1075
1076         if (new_waiters_len == msg_ctx->num_new_waiters) {
1077                 struct tevent_req **tmp;
1078
1079                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1080                                      struct tevent_req *, new_waiters_len+1);
1081                 if (tevent_req_nomem(tmp, req)) {
1082                         return tevent_req_post(req, ev);
1083                 }
1084                 msg_ctx->new_waiters = tmp;
1085         }
1086
1087         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1088         msg_ctx->num_new_waiters += 1;
1089         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1090
1091         ok = messaging_register_event_context(msg_ctx, ev);
1092         if (!ok) {
1093                 tevent_req_oom(req);
1094                 return tevent_req_post(req, ev);
1095         }
1096
1097         return req;
1098 }
1099
1100 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1101                                             enum tevent_req_state req_state)
1102 {
1103         struct messaging_filtered_read_state *state = tevent_req_data(
1104                 req, struct messaging_filtered_read_state);
1105         struct messaging_context *msg_ctx = state->msg_ctx;
1106         size_t i;
1107         bool ok;
1108
1109         tevent_req_set_cleanup_fn(req, NULL);
1110
1111         TALLOC_FREE(state->fde);
1112         TALLOC_FREE(state->cluster_fde);
1113
1114         ok = messaging_deregister_event_context(msg_ctx, state->ev);
1115         if (!ok) {
1116                 abort();
1117         }
1118
1119         /*
1120          * Just set the [new_]waiters entry to NULL, be careful not to mess
1121          * with the other "waiters" array contents. We are often called from
1122          * within "messaging_dispatch_rec", which loops over
1123          * "waiters". Messing with the "waiters" array will mess up that
1124          * for-loop.
1125          */
1126
1127         for (i=0; i<msg_ctx->num_waiters; i++) {
1128                 if (msg_ctx->waiters[i] == req) {
1129                         msg_ctx->waiters[i] = NULL;
1130                         return;
1131                 }
1132         }
1133
1134         for (i=0; i<msg_ctx->num_new_waiters; i++) {
1135                 if (msg_ctx->new_waiters[i] == req) {
1136                         msg_ctx->new_waiters[i] = NULL;
1137                         return;
1138                 }
1139         }
1140 }
1141
1142 static void messaging_filtered_read_done(struct tevent_req *req,
1143                                          struct messaging_rec *rec)
1144 {
1145         struct messaging_filtered_read_state *state = tevent_req_data(
1146                 req, struct messaging_filtered_read_state);
1147
1148         state->rec = messaging_rec_dup(state, rec);
1149         if (tevent_req_nomem(state->rec, req)) {
1150                 return;
1151         }
1152         tevent_req_done(req);
1153 }
1154
1155 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1156                                  struct messaging_rec **presult)
1157 {
1158         struct messaging_filtered_read_state *state = tevent_req_data(
1159                 req, struct messaging_filtered_read_state);
1160         int err;
1161
1162         if (tevent_req_is_unix_error(req, &err)) {
1163                 tevent_req_received(req);
1164                 return err;
1165         }
1166         if (presult != NULL) {
1167                 *presult = talloc_move(mem_ctx, &state->rec);
1168         }
1169         tevent_req_received(req);
1170         return 0;
1171 }
1172
1173 struct messaging_read_state {
1174         uint32_t msg_type;
1175         struct messaging_rec *rec;
1176 };
1177
1178 static bool messaging_read_filter(struct messaging_rec *rec,
1179                                   void *private_data);
1180 static void messaging_read_done(struct tevent_req *subreq);
1181
1182 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1183                                        struct tevent_context *ev,
1184                                        struct messaging_context *msg,
1185                                        uint32_t msg_type)
1186 {
1187         struct tevent_req *req, *subreq;
1188         struct messaging_read_state *state;
1189
1190         req = tevent_req_create(mem_ctx, &state,
1191                                 struct messaging_read_state);
1192         if (req == NULL) {
1193                 return NULL;
1194         }
1195         state->msg_type = msg_type;
1196
1197         subreq = messaging_filtered_read_send(state, ev, msg,
1198                                               messaging_read_filter, state);
1199         if (tevent_req_nomem(subreq, req)) {
1200                 return tevent_req_post(req, ev);
1201         }
1202         tevent_req_set_callback(subreq, messaging_read_done, req);
1203         return req;
1204 }
1205
1206 static bool messaging_read_filter(struct messaging_rec *rec,
1207                                   void *private_data)
1208 {
1209         struct messaging_read_state *state = talloc_get_type_abort(
1210                 private_data, struct messaging_read_state);
1211
1212         if (rec->num_fds != 0) {
1213                 return false;
1214         }
1215
1216         return rec->msg_type == state->msg_type;
1217 }
1218
1219 static void messaging_read_done(struct tevent_req *subreq)
1220 {
1221         struct tevent_req *req = tevent_req_callback_data(
1222                 subreq, struct tevent_req);
1223         struct messaging_read_state *state = tevent_req_data(
1224                 req, struct messaging_read_state);
1225         int ret;
1226
1227         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1228         TALLOC_FREE(subreq);
1229         if (tevent_req_error(req, ret)) {
1230                 return;
1231         }
1232         tevent_req_done(req);
1233 }
1234
1235 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1236                         struct messaging_rec **presult)
1237 {
1238         struct messaging_read_state *state = tevent_req_data(
1239                 req, struct messaging_read_state);
1240         int err;
1241
1242         if (tevent_req_is_unix_error(req, &err)) {
1243                 return err;
1244         }
1245         if (presult != NULL) {
1246                 *presult = talloc_move(mem_ctx, &state->rec);
1247         }
1248         return 0;
1249 }
1250
1251 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1252 {
1253         if (msg_ctx->num_new_waiters == 0) {
1254                 return true;
1255         }
1256
1257         if (talloc_array_length(msg_ctx->waiters) <
1258             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1259                 struct tevent_req **tmp;
1260                 tmp = talloc_realloc(
1261                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1262                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1263                 if (tmp == NULL) {
1264                         DEBUG(1, ("%s: talloc failed\n", __func__));
1265                         return false;
1266                 }
1267                 msg_ctx->waiters = tmp;
1268         }
1269
1270         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1271                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1272
1273         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1274         msg_ctx->num_new_waiters = 0;
1275
1276         return true;
1277 }
1278
1279 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1280                                        struct messaging_rec *rec)
1281 {
1282         struct messaging_callback *cb, *next;
1283
1284         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1285                 size_t j;
1286
1287                 next = cb->next;
1288                 if (cb->msg_type != rec->msg_type) {
1289                         continue;
1290                 }
1291
1292                 /*
1293                  * the old style callbacks don't support fd passing
1294                  */
1295                 for (j=0; j < rec->num_fds; j++) {
1296                         int fd = rec->fds[j];
1297                         close(fd);
1298                 }
1299                 rec->num_fds = 0;
1300                 rec->fds = NULL;
1301
1302                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1303                        rec->src, &rec->buf);
1304
1305                 return true;
1306         }
1307
1308         return false;
1309 }
1310
1311 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1312                                        struct tevent_context *ev,
1313                                        struct messaging_rec *rec)
1314 {
1315         size_t i;
1316
1317         if (!messaging_append_new_waiters(msg_ctx)) {
1318                 return false;
1319         }
1320
1321         i = 0;
1322         while (i < msg_ctx->num_waiters) {
1323                 struct tevent_req *req;
1324                 struct messaging_filtered_read_state *state;
1325
1326                 req = msg_ctx->waiters[i];
1327                 if (req == NULL) {
1328                         /*
1329                          * This got cleaned up. In the meantime,
1330                          * move everything down one. We need
1331                          * to keep the order of waiters, as
1332                          * other code may depend on this.
1333                          */
1334                         ARRAY_DEL_ELEMENT(
1335                                 msg_ctx->waiters, i, msg_ctx->num_waiters);
1336                         msg_ctx->num_waiters -= 1;
1337                         continue;
1338                 }
1339
1340                 state = tevent_req_data(
1341                         req, struct messaging_filtered_read_state);
1342                 if ((ev == state->ev) &&
1343                     state->filter(rec, state->private_data)) {
1344                         messaging_filtered_read_done(req, rec);
1345                         return true;
1346                 }
1347
1348                 i += 1;
1349         }
1350
1351         return false;
1352 }
1353
1354 /*
1355   Dispatch one messaging_rec
1356 */
1357 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1358                                    struct tevent_context *ev,
1359                                    struct messaging_rec *rec)
1360 {
1361         bool consumed;
1362         size_t i;
1363
1364         if (ev == msg_ctx->event_ctx) {
1365                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1366                 if (consumed) {
1367                         return;
1368                 }
1369         }
1370
1371         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1372         if (consumed) {
1373                 return;
1374         }
1375
1376         if (ev != msg_ctx->event_ctx) {
1377                 struct iovec iov;
1378                 int fds[MAX(1, rec->num_fds)];
1379                 int ret;
1380
1381                 /*
1382                  * We've been listening on a nested event
1383                  * context. Messages need to be handled in the main
1384                  * event context, so post to ourselves
1385                  */
1386
1387                 iov.iov_base = rec->buf.data;
1388                 iov.iov_len = rec->buf.length;
1389
1390                 for (i=0; i<rec->num_fds; i++) {
1391                         fds[i] = rec->fds[i];
1392                 }
1393
1394                 ret = messaging_post_self(
1395                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1396                         &iov, 1, fds, rec->num_fds);
1397                 if (ret == 0) {
1398                         return;
1399                 }
1400         }
1401 }
1402
1403 static int mess_parent_dgm_cleanup(void *private_data);
1404 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1405
1406 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1407 {
1408         struct tevent_req *req;
1409
1410         req = background_job_send(
1411                 msg, msg->event_ctx, msg, NULL, 0,
1412                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1413                             60*15),
1414                 mess_parent_dgm_cleanup, msg);
1415         if (req == NULL) {
1416                 DBG_WARNING("background_job_send failed\n");
1417                 return false;
1418         }
1419         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1420         return true;
1421 }
1422
1423 static int mess_parent_dgm_cleanup(void *private_data)
1424 {
1425         int ret;
1426
1427         ret = messaging_dgm_wipe();
1428         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1429                    ret ? strerror(ret) : "ok"));
1430         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1431                            60*15);
1432 }
1433
1434 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1435 {
1436         struct messaging_context *msg = tevent_req_callback_data(
1437                 req, struct messaging_context);
1438         NTSTATUS status;
1439
1440         status = background_job_recv(req);
1441         TALLOC_FREE(req);
1442         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1443                   nt_errstr(status)));
1444
1445         req = background_job_send(
1446                 msg, msg->event_ctx, msg, NULL, 0,
1447                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1448                             60*15),
1449                 mess_parent_dgm_cleanup, msg);
1450         if (req == NULL) {
1451                 DEBUG(1, ("background_job_send failed\n"));
1452                 return;
1453         }
1454         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1455 }
1456
1457 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1458 {
1459         int ret;
1460
1461         if (pid == 0) {
1462                 ret = messaging_dgm_wipe();
1463         } else {
1464                 ret = messaging_dgm_cleanup(pid);
1465         }
1466
1467         return ret;
1468 }
1469
1470 struct tevent_context *messaging_tevent_context(
1471         struct messaging_context *msg_ctx)
1472 {
1473         return msg_ctx->event_ctx;
1474 }
1475
1476 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1477 {
1478         return msg_ctx->names_db;
1479 }
1480
1481 /** @} **/