docs: fix a typo in history file
[bbaumbach/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messaging/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messaging/messages_dgm_ref.h"
59 #include "lib/messages_ctdb.h"
60 #include "lib/messages_ctdb_ref.h"
61 #include "lib/messages_util.h"
62 #include "cluster_support.h"
63 #include "ctdbd_conn.h"
64 #include "ctdb_srvids.h"
65
66 #ifdef CLUSTER_SUPPORT
67 #include "ctdb_protocol.h"
68 #endif
69
70 struct messaging_callback {
71         struct messaging_callback *prev, *next;
72         uint32_t msg_type;
73         void (*fn)(struct messaging_context *msg, void *private_data, 
74                    uint32_t msg_type, 
75                    struct server_id server_id, DATA_BLOB *data);
76         void *private_data;
77 };
78
79 struct messaging_registered_ev {
80         struct tevent_context *ev;
81         struct tevent_immediate *im;
82         size_t refcount;
83 };
84
85 struct messaging_context {
86         struct server_id id;
87         struct tevent_context *event_ctx;
88         struct messaging_callback *callbacks;
89
90         struct messaging_rec *posted_msgs;
91
92         struct messaging_registered_ev *event_contexts;
93
94         struct tevent_req **new_waiters;
95         size_t num_new_waiters;
96
97         struct tevent_req **waiters;
98         size_t num_waiters;
99
100         struct server_id_db *names_db;
101
102         TALLOC_CTX *per_process_talloc_ctx;
103 };
104
105 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
106                                                struct messaging_rec *rec);
107 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
108                                        struct messaging_rec *rec);
109 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
110                                        struct tevent_context *ev,
111                                        struct messaging_rec *rec);
112 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
113                                    struct tevent_context *ev,
114                                    struct messaging_rec *rec);
115
116 /****************************************************************************
117  A useful function for testing the message system.
118 ****************************************************************************/
119
120 static void ping_message(struct messaging_context *msg_ctx,
121                          void *private_data,
122                          uint32_t msg_type,
123                          struct server_id src,
124                          DATA_BLOB *data)
125 {
126         struct server_id_buf idbuf;
127
128         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
129                   server_id_str_buf(src, &idbuf), (int)data->length,
130                   data->data ? (char *)data->data : ""));
131
132         messaging_send(msg_ctx, src, MSG_PONG, data);
133 }
134
135 struct messaging_rec *messaging_rec_create(
136         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
137         uint32_t msg_type, const struct iovec *iov, int iovlen,
138         const int *fds, size_t num_fds)
139 {
140         ssize_t buflen;
141         uint8_t *buf;
142         struct messaging_rec *result;
143
144         if (num_fds > INT8_MAX) {
145                 return NULL;
146         }
147
148         buflen = iov_buflen(iov, iovlen);
149         if (buflen == -1) {
150                 return NULL;
151         }
152         buf = talloc_array(mem_ctx, uint8_t, buflen);
153         if (buf == NULL) {
154                 return NULL;
155         }
156         iov_buf(iov, iovlen, buf, buflen);
157
158         {
159                 struct messaging_rec rec;
160                 int64_t fds64[MAX(1, num_fds)];
161                 size_t i;
162
163                 for (i=0; i<num_fds; i++) {
164                         fds64[i] = fds[i];
165                 }
166
167                 rec = (struct messaging_rec) {
168                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
169                         .src = src, .dest = dst,
170                         .buf.data = buf, .buf.length = buflen,
171                         .num_fds = num_fds, .fds = fds64,
172                 };
173
174                 result = messaging_rec_dup(mem_ctx, &rec);
175         }
176
177         TALLOC_FREE(buf);
178
179         return result;
180 }
181
182 static bool messaging_register_event_context(struct messaging_context *ctx,
183                                              struct tevent_context *ev)
184 {
185         size_t i, num_event_contexts;
186         struct messaging_registered_ev *free_reg = NULL;
187         struct messaging_registered_ev *tmp;
188
189         num_event_contexts = talloc_array_length(ctx->event_contexts);
190
191         for (i=0; i<num_event_contexts; i++) {
192                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
193
194                 if (reg->refcount == 0) {
195                         if (reg->ev != NULL) {
196                                 abort();
197                         }
198                         free_reg = reg;
199                         /*
200                          * We continue here and may find another
201                          * free_req, but the important thing is
202                          * that we continue to search for an
203                          * existing registration in the loop.
204                          */
205                         continue;
206                 }
207
208                 if (reg->ev == ev) {
209                         reg->refcount += 1;
210                         return true;
211                 }
212         }
213
214         if (free_reg == NULL) {
215                 struct tevent_immediate *im = NULL;
216
217                 im = tevent_create_immediate(ctx);
218                 if (im == NULL) {
219                         return false;
220                 }
221
222                 tmp = talloc_realloc(ctx, ctx->event_contexts,
223                                      struct messaging_registered_ev,
224                                      num_event_contexts+1);
225                 if (tmp == NULL) {
226                         return false;
227                 }
228                 ctx->event_contexts = tmp;
229
230                 free_reg = &ctx->event_contexts[num_event_contexts];
231                 free_reg->im = talloc_move(ctx->event_contexts, &im);
232         }
233
234         /*
235          * free_reg->im might be cached
236          */
237         free_reg->ev = ev;
238         free_reg->refcount = 1;
239
240         return true;
241 }
242
243 static bool messaging_deregister_event_context(struct messaging_context *ctx,
244                                                struct tevent_context *ev)
245 {
246         size_t i, num_event_contexts;
247
248         num_event_contexts = talloc_array_length(ctx->event_contexts);
249
250         for (i=0; i<num_event_contexts; i++) {
251                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
252
253                 if (reg->refcount == 0) {
254                         continue;
255                 }
256
257                 if (reg->ev == ev) {
258                         reg->refcount -= 1;
259
260                         if (reg->refcount == 0) {
261                                 /*
262                                  * The primary event context
263                                  * is never unregistered using
264                                  * messaging_deregister_event_context()
265                                  * it's only registered using
266                                  * messaging_register_event_context().
267                                  */
268                                 SMB_ASSERT(ev != ctx->event_ctx);
269                                 SMB_ASSERT(reg->ev != ctx->event_ctx);
270
271                                 /*
272                                  * Not strictly necessary, just
273                                  * paranoia
274                                  */
275                                 reg->ev = NULL;
276
277                                 /*
278                                  * Do not talloc_free(reg->im),
279                                  * recycle immediates events.
280                                  *
281                                  * We just invalidate it using
282                                  * the primary event context,
283                                  * which is never unregistered.
284                                  */
285                                 tevent_schedule_immediate(reg->im,
286                                                           ctx->event_ctx,
287                                                           NULL, NULL);
288                         }
289                         return true;
290                 }
291         }
292         return false;
293 }
294
295 static void messaging_post_main_event_context(struct tevent_context *ev,
296                                               struct tevent_immediate *im,
297                                               void *private_data)
298 {
299         struct messaging_context *ctx = talloc_get_type_abort(
300                 private_data, struct messaging_context);
301
302         while (ctx->posted_msgs != NULL) {
303                 struct messaging_rec *rec = ctx->posted_msgs;
304                 bool consumed;
305
306                 DLIST_REMOVE(ctx->posted_msgs, rec);
307
308                 consumed = messaging_dispatch_classic(ctx, rec);
309                 if (!consumed) {
310                         consumed = messaging_dispatch_waiters(
311                                 ctx, ctx->event_ctx, rec);
312                 }
313
314                 if (!consumed) {
315                         uint8_t i;
316
317                         for (i=0; i<rec->num_fds; i++) {
318                                 close(rec->fds[i]);
319                         }
320                 }
321
322                 TALLOC_FREE(rec);
323         }
324 }
325
326 static void messaging_post_sub_event_context(struct tevent_context *ev,
327                                              struct tevent_immediate *im,
328                                              void *private_data)
329 {
330         struct messaging_context *ctx = talloc_get_type_abort(
331                 private_data, struct messaging_context);
332         struct messaging_rec *rec, *next;
333
334         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
335                 bool consumed;
336
337                 next = rec->next;
338
339                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
340                 if (consumed) {
341                         DLIST_REMOVE(ctx->posted_msgs, rec);
342                         TALLOC_FREE(rec);
343                 }
344         }
345 }
346
347 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
348 {
349         size_t i, num_event_contexts;
350
351         num_event_contexts = talloc_array_length(ctx->event_contexts);
352
353         for (i=0; i<num_event_contexts; i++) {
354                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
355
356                 if (reg->refcount == 0) {
357                         continue;
358                 }
359
360                 /*
361                  * We depend on schedule_immediate to work
362                  * multiple times. Might be a bit inefficient,
363                  * but this needs to be proven in tests. The
364                  * alternatively would be to track whether the
365                  * immediate has already been scheduled. For
366                  * now, avoid that complexity here.
367                  */
368
369                 if (reg->ev == ctx->event_ctx) {
370                         tevent_schedule_immediate(
371                                 reg->im, reg->ev,
372                                 messaging_post_main_event_context,
373                                 ctx);
374                 } else {
375                         tevent_schedule_immediate(
376                                 reg->im, reg->ev,
377                                 messaging_post_sub_event_context,
378                                 ctx);
379                 }
380
381         }
382         return true;
383 }
384
385 static void messaging_recv_cb(struct tevent_context *ev,
386                               const uint8_t *msg, size_t msg_len,
387                               int *fds, size_t num_fds,
388                               void *private_data)
389 {
390         struct messaging_context *msg_ctx = talloc_get_type_abort(
391                 private_data, struct messaging_context);
392         struct server_id_buf idbuf;
393         struct messaging_rec rec;
394         int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
395         size_t i;
396
397         if (msg_len < MESSAGE_HDR_LENGTH) {
398                 DBG_WARNING("message too short: %zu\n", msg_len);
399                 return;
400         }
401
402         if (num_fds > INT8_MAX) {
403                 DBG_WARNING("too many fds: %zu\n", num_fds);
404                 return;
405         }
406
407         for (i=0; i < num_fds; i++) {
408                 fds64[i] = fds[i];
409         }
410
411         rec = (struct messaging_rec) {
412                 .msg_version = MESSAGE_VERSION,
413                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
414                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
415                 .num_fds = num_fds,
416                 .fds = fds64,
417         };
418
419         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
420
421         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
422                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
423                   server_id_str_buf(rec.src, &idbuf));
424
425         if (server_id_same_process(&rec.src, &msg_ctx->id)) {
426                 DBG_DEBUG("Ignoring self-send\n");
427                 return;
428         }
429
430         messaging_dispatch_rec(msg_ctx, ev, &rec);
431
432         for (i=0; i<num_fds; i++) {
433                 fds[i] = fds64[i];
434         }
435 }
436
437 static int messaging_context_destructor(struct messaging_context *ctx)
438 {
439         size_t i;
440
441         for (i=0; i<ctx->num_new_waiters; i++) {
442                 if (ctx->new_waiters[i] != NULL) {
443                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
444                         ctx->new_waiters[i] = NULL;
445                 }
446         }
447         for (i=0; i<ctx->num_waiters; i++) {
448                 if (ctx->waiters[i] != NULL) {
449                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
450                         ctx->waiters[i] = NULL;
451                 }
452         }
453
454         /*
455          * The immediates from messaging_alert_event_contexts
456          * reference "ctx". Don't let them outlive the
457          * messaging_context we're destroying here.
458          */
459         TALLOC_FREE(ctx->event_contexts);
460
461         return 0;
462 }
463
464 static const char *private_path(const char *name)
465 {
466         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
467 }
468
469 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
470                                         struct tevent_context *ev,
471                                         struct messaging_context **pmsg_ctx)
472 {
473         TALLOC_CTX *frame;
474         struct messaging_context *ctx;
475         NTSTATUS status;
476         int ret;
477         const char *lck_path;
478         const char *priv_path;
479         void *ref;
480         bool ok;
481
482         /*
483          * sec_init() *must* be called before any other
484          * functions that use sec_XXX(). e.g. sec_initial_uid().
485          */
486
487         sec_init();
488
489         lck_path = lock_path(talloc_tos(), "msg.lock");
490         if (lck_path == NULL) {
491                 return NT_STATUS_NO_MEMORY;
492         }
493
494         ok = directory_create_or_exist_strict(lck_path,
495                                               sec_initial_uid(),
496                                               0755);
497         if (!ok) {
498                 DBG_DEBUG("Could not create lock directory: %s\n",
499                           strerror(errno));
500                 return NT_STATUS_ACCESS_DENIED;
501         }
502
503         priv_path = private_path("msg.sock");
504         if (priv_path == NULL) {
505                 return NT_STATUS_NO_MEMORY;
506         }
507
508         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
509                                               0700);
510         if (!ok) {
511                 DBG_DEBUG("Could not create msg directory: %s\n",
512                           strerror(errno));
513                 return NT_STATUS_ACCESS_DENIED;
514         }
515
516         frame = talloc_stackframe();
517         if (frame == NULL) {
518                 return NT_STATUS_NO_MEMORY;
519         }
520
521         ctx = talloc_zero(frame, struct messaging_context);
522         if (ctx == NULL) {
523                 status = NT_STATUS_NO_MEMORY;
524                 goto done;
525         }
526
527         ctx->id = (struct server_id) {
528                 .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
529         };
530
531         ctx->event_ctx = ev;
532
533         ctx->per_process_talloc_ctx = talloc_new(ctx);
534         if (ctx->per_process_talloc_ctx == NULL) {
535                 status = NT_STATUS_NO_MEMORY;
536                 goto done;
537         }
538
539         ok = messaging_register_event_context(ctx, ev);
540         if (!ok) {
541                 status = NT_STATUS_NO_MEMORY;
542                 goto done;
543         }
544
545         ref = messaging_dgm_ref(
546                 ctx->per_process_talloc_ctx,
547                 ctx->event_ctx,
548                 &ctx->id.unique_id,
549                 priv_path,
550                 lck_path,
551                 messaging_recv_cb,
552                 ctx,
553                 &ret);
554         if (ref == NULL) {
555                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
556                 status = map_nt_error_from_unix(ret);
557                 goto done;
558         }
559         talloc_set_destructor(ctx, messaging_context_destructor);
560
561 #ifdef CLUSTER_SUPPORT
562         if (lp_clustering()) {
563                 ref = messaging_ctdb_ref(
564                         ctx->per_process_talloc_ctx,
565                         ctx->event_ctx,
566                         lp_ctdbd_socket(),
567                         lp_ctdb_timeout(),
568                         ctx->id.unique_id,
569                         messaging_recv_cb,
570                         ctx,
571                         &ret);
572                 if (ref == NULL) {
573                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
574                                    strerror(ret));
575                         status = map_nt_error_from_unix(ret);
576                         goto done;
577                 }
578         }
579 #endif
580
581         ctx->id.vnn = get_my_vnn();
582
583         ctx->names_db = server_id_db_init(ctx,
584                                           ctx->id,
585                                           lp_lock_directory(),
586                                           0,
587                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
588         if (ctx->names_db == NULL) {
589                 DBG_DEBUG("server_id_db_init failed\n");
590                 status = NT_STATUS_NO_MEMORY;
591                 goto done;
592         }
593
594         messaging_register(ctx, NULL, MSG_PING, ping_message);
595
596         /* Register some debugging related messages */
597
598         register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
599         register_dmalloc_msgs(ctx);
600         debug_register_msgs(ctx);
601
602         {
603                 struct server_id_buf tmp;
604                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
605         }
606
607         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
608
609         status = NT_STATUS_OK;
610 done:
611         TALLOC_FREE(frame);
612
613         return status;
614 }
615
616 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
617                                          struct tevent_context *ev)
618 {
619         struct messaging_context *ctx = NULL;
620         NTSTATUS status;
621
622         status = messaging_init_internal(mem_ctx,
623                                          ev,
624                                          &ctx);
625         if (!NT_STATUS_IS_OK(status)) {
626                 return NULL;
627         }
628
629         return ctx;
630 }
631
632 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
633 {
634         return msg_ctx->id;
635 }
636
637 /*
638  * re-init after a fork
639  */
640 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
641 {
642         int ret;
643         char *lck_path;
644         void *ref;
645
646         TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
647
648         msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
649         if (msg_ctx->per_process_talloc_ctx == NULL) {
650                 return NT_STATUS_NO_MEMORY;
651         }
652
653         msg_ctx->id = (struct server_id) {
654                 .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
655         };
656
657         lck_path = lock_path(talloc_tos(), "msg.lock");
658         if (lck_path == NULL) {
659                 return NT_STATUS_NO_MEMORY;
660         }
661
662         ref = messaging_dgm_ref(
663                 msg_ctx->per_process_talloc_ctx,
664                 msg_ctx->event_ctx,
665                 &msg_ctx->id.unique_id,
666                 private_path("msg.sock"),
667                 lck_path,
668                 messaging_recv_cb,
669                 msg_ctx,
670                 &ret);
671
672         if (ref == NULL) {
673                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
674                 return map_nt_error_from_unix(ret);
675         }
676
677         if (lp_clustering()) {
678                 ref = messaging_ctdb_ref(
679                         msg_ctx->per_process_talloc_ctx,
680                         msg_ctx->event_ctx,
681                         lp_ctdbd_socket(),
682                         lp_ctdb_timeout(),
683                         msg_ctx->id.unique_id,
684                         messaging_recv_cb,
685                         msg_ctx,
686                         &ret);
687                 if (ref == NULL) {
688                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
689                                    strerror(ret));
690                         return map_nt_error_from_unix(ret);
691                 }
692         }
693
694         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
695         register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
696
697         return NT_STATUS_OK;
698 }
699
700
701 /*
702  * Register a dispatch function for a particular message type. Allow multiple
703  * registrants
704 */
705 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
706                             void *private_data,
707                             uint32_t msg_type,
708                             void (*fn)(struct messaging_context *msg,
709                                        void *private_data, 
710                                        uint32_t msg_type, 
711                                        struct server_id server_id,
712                                        DATA_BLOB *data))
713 {
714         struct messaging_callback *cb;
715
716         DEBUG(5, ("Registering messaging pointer for type %u - "
717                   "private_data=%p\n",
718                   (unsigned)msg_type, private_data));
719
720         /*
721          * Only one callback per type
722          */
723
724         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
725                 /* we allow a second registration of the same message
726                    type if it has a different private pointer. This is
727                    needed in, for example, the internal notify code,
728                    which creates a new notify context for each tree
729                    connect, and expects to receive messages to each of
730                    them. */
731                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
732                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
733                                   (unsigned)msg_type, private_data));
734                         cb->fn = fn;
735                         cb->private_data = private_data;
736                         return NT_STATUS_OK;
737                 }
738         }
739
740         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
741                 return NT_STATUS_NO_MEMORY;
742         }
743
744         cb->msg_type = msg_type;
745         cb->fn = fn;
746         cb->private_data = private_data;
747
748         DLIST_ADD(msg_ctx->callbacks, cb);
749         return NT_STATUS_OK;
750 }
751
752 /*
753   De-register the function for a particular message type.
754 */
755 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
756                           void *private_data)
757 {
758         struct messaging_callback *cb, *next;
759
760         for (cb = ctx->callbacks; cb; cb = next) {
761                 next = cb->next;
762                 if ((cb->msg_type == msg_type)
763                     && (cb->private_data == private_data)) {
764                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
765                                   (unsigned)msg_type, private_data));
766                         DLIST_REMOVE(ctx->callbacks, cb);
767                         TALLOC_FREE(cb);
768                 }
769         }
770 }
771
772 /*
773   Send a message to a particular server
774 */
775 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
776                         struct server_id server, uint32_t msg_type,
777                         const DATA_BLOB *data)
778 {
779         struct iovec iov = {0};
780
781         if (data != NULL) {
782                 iov.iov_base = data->data;
783                 iov.iov_len = data->length;
784         };
785
786         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
787 }
788
789 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
790                             struct server_id server, uint32_t msg_type,
791                             const uint8_t *buf, size_t len)
792 {
793         DATA_BLOB blob = data_blob_const(buf, len);
794         return messaging_send(msg_ctx, server, msg_type, &blob);
795 }
796
797 static int messaging_post_self(struct messaging_context *msg_ctx,
798                                struct server_id src, struct server_id dst,
799                                uint32_t msg_type,
800                                const struct iovec *iov, int iovlen,
801                                const int *fds, size_t num_fds)
802 {
803         struct messaging_rec *rec;
804         bool ok;
805
806         rec = messaging_rec_create(
807                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
808         if (rec == NULL) {
809                 return ENOMEM;
810         }
811
812         ok = messaging_alert_event_contexts(msg_ctx);
813         if (!ok) {
814                 TALLOC_FREE(rec);
815                 return ENOMEM;
816         }
817
818         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
819
820         return 0;
821 }
822
823 int messaging_send_iov_from(struct messaging_context *msg_ctx,
824                             struct server_id src, struct server_id dst,
825                             uint32_t msg_type,
826                             const struct iovec *iov, int iovlen,
827                             const int *fds, size_t num_fds)
828 {
829         int ret;
830         uint8_t hdr[MESSAGE_HDR_LENGTH];
831         struct iovec iov2[iovlen+1];
832
833         if (server_id_is_disconnected(&dst)) {
834                 return EINVAL;
835         }
836
837         if (num_fds > INT8_MAX) {
838                 return EINVAL;
839         }
840
841         if (server_id_equal(&dst, &msg_ctx->id)) {
842                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
843                                           iov, iovlen, fds, num_fds);
844                 return ret;
845         }
846
847         message_hdr_put(hdr, msg_type, src, dst);
848         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
849         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
850
851         if (dst.vnn != msg_ctx->id.vnn) {
852                 if (num_fds > 0) {
853                         return ENOSYS;
854                 }
855
856                 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
857                 return ret;
858         }
859
860         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
861
862         if (ret == EACCES) {
863                 become_root();
864                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
865                                          fds, num_fds);
866                 unbecome_root();
867         }
868
869         if (ret == ECONNREFUSED) {
870                 /*
871                  * Linux returns this when a socket exists in the file
872                  * system without a listening process. This is not
873                  * documented in susv4 or the linux manpages, but it's
874                  * easily testable. For the higher levels this is the
875                  * same as "destination does not exist"
876                  */
877                 ret = ENOENT;
878         }
879
880         return ret;
881 }
882
883 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
884                             struct server_id dst, uint32_t msg_type,
885                             const struct iovec *iov, int iovlen,
886                             const int *fds, size_t num_fds)
887 {
888         int ret;
889
890         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
891                                       iov, iovlen, fds, num_fds);
892         if (ret != 0) {
893                 return map_nt_error_from_unix(ret);
894         }
895         return NT_STATUS_OK;
896 }
897
898 struct send_all_state {
899         struct messaging_context *msg_ctx;
900         int msg_type;
901         const void *buf;
902         size_t len;
903 };
904
905 static int send_all_fn(pid_t pid, void *private_data)
906 {
907         struct send_all_state *state = private_data;
908         NTSTATUS status;
909
910         if (pid == tevent_cached_getpid()) {
911                 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
912                 return 0;
913         }
914
915         status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
916                                     state->msg_type, state->buf, state->len);
917         if (!NT_STATUS_IS_OK(status)) {
918                 DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
919                             (uintmax_t)pid, nt_errstr(status));
920         }
921
922         return 0;
923 }
924
925 void messaging_send_all(struct messaging_context *msg_ctx,
926                         int msg_type, const void *buf, size_t len)
927 {
928         struct send_all_state state = {
929                 .msg_ctx = msg_ctx, .msg_type = msg_type,
930                 .buf = buf, .len = len
931         };
932         int ret;
933
934 #ifdef CLUSTER_SUPPORT
935         if (lp_clustering()) {
936                 struct ctdbd_connection *conn = messaging_ctdb_connection();
937                 uint8_t msghdr[MESSAGE_HDR_LENGTH];
938                 struct iovec iov[] = {
939                         { .iov_base = msghdr,
940                           .iov_len = sizeof(msghdr) },
941                         { .iov_base = discard_const_p(void, buf),
942                           .iov_len = len }
943                 };
944
945                 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
946                                 (struct server_id) {0});
947
948                 ret = ctdbd_messaging_send_iov(
949                         conn, CTDB_BROADCAST_CONNECTED,
950                         CTDB_SRVID_SAMBA_PROCESS,
951                         iov, ARRAY_SIZE(iov));
952                 if (ret != 0) {
953                         DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
954                                     strerror(ret));
955                 }
956
957                 return;
958         }
959 #endif
960
961         ret = messaging_dgm_forall(send_all_fn, &state);
962         if (ret != 0) {
963                 DBG_WARNING("messaging_dgm_forall failed: %s\n",
964                             strerror(ret));
965         }
966 }
967
968 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
969                                                struct messaging_rec *rec)
970 {
971         struct messaging_rec *result;
972         size_t fds_size = sizeof(int64_t) * rec->num_fds;
973         size_t payload_len;
974
975         payload_len = rec->buf.length + fds_size;
976         if (payload_len < rec->buf.length) {
977                 /* overflow */
978                 return NULL;
979         }
980
981         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
982                                       payload_len);
983         if (result == NULL) {
984                 return NULL;
985         }
986         *result = *rec;
987
988         /* Doesn't fail, see talloc_pooled_object */
989
990         result->buf.data = talloc_memdup(result, rec->buf.data,
991                                          rec->buf.length);
992
993         result->fds = NULL;
994         if (result->num_fds > 0) {
995                 size_t i;
996
997                 result->fds = talloc_memdup(result, rec->fds, fds_size);
998
999                 for (i=0; i<rec->num_fds; i++) {
1000                         /*
1001                          * fd's can only exist once
1002                          */
1003                         rec->fds[i] = -1;
1004                 }
1005         }
1006
1007         return result;
1008 }
1009
1010 struct messaging_filtered_read_state {
1011         struct tevent_context *ev;
1012         struct messaging_context *msg_ctx;
1013         struct messaging_dgm_fde *fde;
1014         struct messaging_ctdb_fde *cluster_fde;
1015
1016         bool (*filter)(struct messaging_rec *rec, void *private_data);
1017         void *private_data;
1018
1019         struct messaging_rec *rec;
1020 };
1021
1022 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1023                                             enum tevent_req_state req_state);
1024
1025 struct tevent_req *messaging_filtered_read_send(
1026         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1027         struct messaging_context *msg_ctx,
1028         bool (*filter)(struct messaging_rec *rec, void *private_data),
1029         void *private_data)
1030 {
1031         struct tevent_req *req;
1032         struct messaging_filtered_read_state *state;
1033         size_t new_waiters_len;
1034         bool ok;
1035
1036         req = tevent_req_create(mem_ctx, &state,
1037                                 struct messaging_filtered_read_state);
1038         if (req == NULL) {
1039                 return NULL;
1040         }
1041         state->ev = ev;
1042         state->msg_ctx = msg_ctx;
1043         state->filter = filter;
1044         state->private_data = private_data;
1045
1046         /*
1047          * We have to defer the callback here, as we might be called from
1048          * within a different tevent_context than state->ev
1049          */
1050         tevent_req_defer_callback(req, state->ev);
1051
1052         state->fde = messaging_dgm_register_tevent_context(state, ev);
1053         if (tevent_req_nomem(state->fde, req)) {
1054                 return tevent_req_post(req, ev);
1055         }
1056
1057         if (lp_clustering()) {
1058                 state->cluster_fde =
1059                         messaging_ctdb_register_tevent_context(state, ev);
1060                 if (tevent_req_nomem(state->cluster_fde, req)) {
1061                         return tevent_req_post(req, ev);
1062                 }
1063         }
1064
1065         /*
1066          * We add ourselves to the "new_waiters" array, not the "waiters"
1067          * array. If we are called from within messaging_read_done,
1068          * messaging_dispatch_rec will be in an active for-loop on
1069          * "waiters". We must be careful not to mess with this array, because
1070          * it could mean that a single event is being delivered twice.
1071          */
1072
1073         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1074
1075         if (new_waiters_len == msg_ctx->num_new_waiters) {
1076                 struct tevent_req **tmp;
1077
1078                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1079                                      struct tevent_req *, new_waiters_len+1);
1080                 if (tevent_req_nomem(tmp, req)) {
1081                         return tevent_req_post(req, ev);
1082                 }
1083                 msg_ctx->new_waiters = tmp;
1084         }
1085
1086         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1087         msg_ctx->num_new_waiters += 1;
1088         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1089
1090         ok = messaging_register_event_context(msg_ctx, ev);
1091         if (!ok) {
1092                 tevent_req_oom(req);
1093                 return tevent_req_post(req, ev);
1094         }
1095
1096         return req;
1097 }
1098
1099 static void messaging_filtered_read_cleanup(struct tevent_req *req,
1100                                             enum tevent_req_state req_state)
1101 {
1102         struct messaging_filtered_read_state *state = tevent_req_data(
1103                 req, struct messaging_filtered_read_state);
1104         struct messaging_context *msg_ctx = state->msg_ctx;
1105         size_t i;
1106         bool ok;
1107
1108         tevent_req_set_cleanup_fn(req, NULL);
1109
1110         TALLOC_FREE(state->fde);
1111         TALLOC_FREE(state->cluster_fde);
1112
1113         ok = messaging_deregister_event_context(msg_ctx, state->ev);
1114         if (!ok) {
1115                 abort();
1116         }
1117
1118         /*
1119          * Just set the [new_]waiters entry to NULL, be careful not to mess
1120          * with the other "waiters" array contents. We are often called from
1121          * within "messaging_dispatch_rec", which loops over
1122          * "waiters". Messing with the "waiters" array will mess up that
1123          * for-loop.
1124          */
1125
1126         for (i=0; i<msg_ctx->num_waiters; i++) {
1127                 if (msg_ctx->waiters[i] == req) {
1128                         msg_ctx->waiters[i] = NULL;
1129                         return;
1130                 }
1131         }
1132
1133         for (i=0; i<msg_ctx->num_new_waiters; i++) {
1134                 if (msg_ctx->new_waiters[i] == req) {
1135                         msg_ctx->new_waiters[i] = NULL;
1136                         return;
1137                 }
1138         }
1139 }
1140
1141 static void messaging_filtered_read_done(struct tevent_req *req,
1142                                          struct messaging_rec *rec)
1143 {
1144         struct messaging_filtered_read_state *state = tevent_req_data(
1145                 req, struct messaging_filtered_read_state);
1146
1147         state->rec = messaging_rec_dup(state, rec);
1148         if (tevent_req_nomem(state->rec, req)) {
1149                 return;
1150         }
1151         tevent_req_done(req);
1152 }
1153
1154 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1155                                  struct messaging_rec **presult)
1156 {
1157         struct messaging_filtered_read_state *state = tevent_req_data(
1158                 req, struct messaging_filtered_read_state);
1159         int err;
1160
1161         if (tevent_req_is_unix_error(req, &err)) {
1162                 tevent_req_received(req);
1163                 return err;
1164         }
1165         if (presult != NULL) {
1166                 *presult = talloc_move(mem_ctx, &state->rec);
1167         }
1168         tevent_req_received(req);
1169         return 0;
1170 }
1171
1172 struct messaging_read_state {
1173         uint32_t msg_type;
1174         struct messaging_rec *rec;
1175 };
1176
1177 static bool messaging_read_filter(struct messaging_rec *rec,
1178                                   void *private_data);
1179 static void messaging_read_done(struct tevent_req *subreq);
1180
1181 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1182                                        struct tevent_context *ev,
1183                                        struct messaging_context *msg,
1184                                        uint32_t msg_type)
1185 {
1186         struct tevent_req *req, *subreq;
1187         struct messaging_read_state *state;
1188
1189         req = tevent_req_create(mem_ctx, &state,
1190                                 struct messaging_read_state);
1191         if (req == NULL) {
1192                 return NULL;
1193         }
1194         state->msg_type = msg_type;
1195
1196         subreq = messaging_filtered_read_send(state, ev, msg,
1197                                               messaging_read_filter, state);
1198         if (tevent_req_nomem(subreq, req)) {
1199                 return tevent_req_post(req, ev);
1200         }
1201         tevent_req_set_callback(subreq, messaging_read_done, req);
1202         return req;
1203 }
1204
1205 static bool messaging_read_filter(struct messaging_rec *rec,
1206                                   void *private_data)
1207 {
1208         struct messaging_read_state *state = talloc_get_type_abort(
1209                 private_data, struct messaging_read_state);
1210
1211         if (rec->num_fds != 0) {
1212                 return false;
1213         }
1214
1215         return rec->msg_type == state->msg_type;
1216 }
1217
1218 static void messaging_read_done(struct tevent_req *subreq)
1219 {
1220         struct tevent_req *req = tevent_req_callback_data(
1221                 subreq, struct tevent_req);
1222         struct messaging_read_state *state = tevent_req_data(
1223                 req, struct messaging_read_state);
1224         int ret;
1225
1226         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1227         TALLOC_FREE(subreq);
1228         if (tevent_req_error(req, ret)) {
1229                 return;
1230         }
1231         tevent_req_done(req);
1232 }
1233
1234 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1235                         struct messaging_rec **presult)
1236 {
1237         struct messaging_read_state *state = tevent_req_data(
1238                 req, struct messaging_read_state);
1239         int err;
1240
1241         if (tevent_req_is_unix_error(req, &err)) {
1242                 return err;
1243         }
1244         if (presult != NULL) {
1245                 *presult = talloc_move(mem_ctx, &state->rec);
1246         }
1247         return 0;
1248 }
1249
1250 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1251 {
1252         if (msg_ctx->num_new_waiters == 0) {
1253                 return true;
1254         }
1255
1256         if (talloc_array_length(msg_ctx->waiters) <
1257             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1258                 struct tevent_req **tmp;
1259                 tmp = talloc_realloc(
1260                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1261                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1262                 if (tmp == NULL) {
1263                         DEBUG(1, ("%s: talloc failed\n", __func__));
1264                         return false;
1265                 }
1266                 msg_ctx->waiters = tmp;
1267         }
1268
1269         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1270                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1271
1272         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1273         msg_ctx->num_new_waiters = 0;
1274
1275         return true;
1276 }
1277
1278 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1279                                        struct messaging_rec *rec)
1280 {
1281         struct messaging_callback *cb, *next;
1282
1283         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1284                 size_t j;
1285
1286                 next = cb->next;
1287                 if (cb->msg_type != rec->msg_type) {
1288                         continue;
1289                 }
1290
1291                 /*
1292                  * the old style callbacks don't support fd passing
1293                  */
1294                 for (j=0; j < rec->num_fds; j++) {
1295                         int fd = rec->fds[j];
1296                         close(fd);
1297                 }
1298                 rec->num_fds = 0;
1299                 rec->fds = NULL;
1300
1301                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1302                        rec->src, &rec->buf);
1303
1304                 return true;
1305         }
1306
1307         return false;
1308 }
1309
1310 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1311                                        struct tevent_context *ev,
1312                                        struct messaging_rec *rec)
1313 {
1314         size_t i;
1315
1316         if (!messaging_append_new_waiters(msg_ctx)) {
1317                 return false;
1318         }
1319
1320         i = 0;
1321         while (i < msg_ctx->num_waiters) {
1322                 struct tevent_req *req;
1323                 struct messaging_filtered_read_state *state;
1324
1325                 req = msg_ctx->waiters[i];
1326                 if (req == NULL) {
1327                         /*
1328                          * This got cleaned up. In the meantime,
1329                          * move everything down one. We need
1330                          * to keep the order of waiters, as
1331                          * other code may depend on this.
1332                          */
1333                         ARRAY_DEL_ELEMENT(
1334                                 msg_ctx->waiters, i, msg_ctx->num_waiters);
1335                         msg_ctx->num_waiters -= 1;
1336                         continue;
1337                 }
1338
1339                 state = tevent_req_data(
1340                         req, struct messaging_filtered_read_state);
1341                 if ((ev == state->ev) &&
1342                     state->filter(rec, state->private_data)) {
1343                         messaging_filtered_read_done(req, rec);
1344                         return true;
1345                 }
1346
1347                 i += 1;
1348         }
1349
1350         return false;
1351 }
1352
1353 /*
1354   Dispatch one messaging_rec
1355 */
1356 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1357                                    struct tevent_context *ev,
1358                                    struct messaging_rec *rec)
1359 {
1360         bool consumed;
1361         size_t i;
1362
1363         if (ev == msg_ctx->event_ctx) {
1364                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1365                 if (consumed) {
1366                         return;
1367                 }
1368         }
1369
1370         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1371         if (consumed) {
1372                 return;
1373         }
1374
1375         if (ev != msg_ctx->event_ctx) {
1376                 struct iovec iov;
1377                 int fds[MAX(1, rec->num_fds)];
1378                 int ret;
1379
1380                 /*
1381                  * We've been listening on a nested event
1382                  * context. Messages need to be handled in the main
1383                  * event context, so post to ourselves
1384                  */
1385
1386                 iov.iov_base = rec->buf.data;
1387                 iov.iov_len = rec->buf.length;
1388
1389                 for (i=0; i<rec->num_fds; i++) {
1390                         fds[i] = rec->fds[i];
1391                 }
1392
1393                 ret = messaging_post_self(
1394                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1395                         &iov, 1, fds, rec->num_fds);
1396                 if (ret == 0) {
1397                         return;
1398                 }
1399         }
1400 }
1401
1402 static int mess_parent_dgm_cleanup(void *private_data);
1403 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1404
1405 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1406 {
1407         struct tevent_req *req;
1408
1409         req = background_job_send(
1410                 msg, msg->event_ctx, msg, NULL, 0,
1411                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1412                             60*15),
1413                 mess_parent_dgm_cleanup, msg);
1414         if (req == NULL) {
1415                 DBG_WARNING("background_job_send failed\n");
1416                 return false;
1417         }
1418         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1419         return true;
1420 }
1421
1422 static int mess_parent_dgm_cleanup(void *private_data)
1423 {
1424         int ret;
1425
1426         ret = messaging_dgm_wipe();
1427         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1428                    ret ? strerror(ret) : "ok"));
1429         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1430                            60*15);
1431 }
1432
1433 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1434 {
1435         struct messaging_context *msg = tevent_req_callback_data(
1436                 req, struct messaging_context);
1437         NTSTATUS status;
1438
1439         status = background_job_recv(req);
1440         TALLOC_FREE(req);
1441         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1442                   nt_errstr(status)));
1443
1444         req = background_job_send(
1445                 msg, msg->event_ctx, msg, NULL, 0,
1446                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1447                             60*15),
1448                 mess_parent_dgm_cleanup, msg);
1449         if (req == NULL) {
1450                 DEBUG(1, ("background_job_send failed\n"));
1451                 return;
1452         }
1453         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1454 }
1455
1456 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1457 {
1458         int ret;
1459
1460         if (pid == 0) {
1461                 ret = messaging_dgm_wipe();
1462         } else {
1463                 ret = messaging_dgm_cleanup(pid);
1464         }
1465
1466         return ret;
1467 }
1468
1469 struct tevent_context *messaging_tevent_context(
1470         struct messaging_context *msg_ctx)
1471 {
1472         return msg_ctx->event_ctx;
1473 }
1474
1475 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1476 {
1477         return msg_ctx->names_db;
1478 }
1479
1480 /** @} **/