messaging: make messaging_rec_create public
[sfrench/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
61
62 struct messaging_callback {
63         struct messaging_callback *prev, *next;
64         uint32_t msg_type;
65         void (*fn)(struct messaging_context *msg, void *private_data, 
66                    uint32_t msg_type, 
67                    struct server_id server_id, DATA_BLOB *data);
68         void *private_data;
69 };
70
71 struct messaging_context {
72         struct server_id id;
73         struct tevent_context *event_ctx;
74         struct messaging_callback *callbacks;
75
76         struct tevent_req **new_waiters;
77         size_t num_new_waiters;
78
79         struct tevent_req **waiters;
80         size_t num_waiters;
81
82         void *msg_dgm_ref;
83         struct messaging_backend *remote;
84
85         struct server_id_db *names_db;
86 };
87
88 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
89                                                struct messaging_rec *rec);
90 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
91                                    struct tevent_context *ev,
92                                    struct messaging_rec *rec);
93
94 /****************************************************************************
95  A useful function for testing the message system.
96 ****************************************************************************/
97
98 static void ping_message(struct messaging_context *msg_ctx,
99                          void *private_data,
100                          uint32_t msg_type,
101                          struct server_id src,
102                          DATA_BLOB *data)
103 {
104         struct server_id_buf idbuf;
105
106         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
107                   server_id_str_buf(src, &idbuf), (int)data->length,
108                   data->data ? (char *)data->data : ""));
109
110         messaging_send(msg_ctx, src, MSG_PONG, data);
111 }
112
113 struct messaging_rec *messaging_rec_create(
114         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
115         uint32_t msg_type, const struct iovec *iov, int iovlen,
116         const int *fds, size_t num_fds)
117 {
118         ssize_t buflen;
119         uint8_t *buf;
120         struct messaging_rec *result;
121
122         if (num_fds > INT8_MAX) {
123                 return NULL;
124         }
125
126         buflen = iov_buflen(iov, iovlen);
127         if (buflen == -1) {
128                 return NULL;
129         }
130         buf = talloc_array(mem_ctx, uint8_t, buflen);
131         if (buf == NULL) {
132                 return NULL;
133         }
134         iov_buf(iov, iovlen, buf, buflen);
135
136         {
137                 struct messaging_rec rec;
138                 int64_t fds64[num_fds];
139                 size_t i;
140
141                 for (i=0; i<num_fds; i++) {
142                         fds64[i] = fds[i];
143                 }
144
145                 rec = (struct messaging_rec) {
146                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
147                         .src = src, .dest = dst,
148                         .buf.data = buf, .buf.length = buflen,
149                         .num_fds = num_fds, .fds = fds64,
150                 };
151
152                 result = messaging_rec_dup(mem_ctx, &rec);
153         }
154
155         TALLOC_FREE(buf);
156
157         return result;
158 }
159
160 static void messaging_recv_cb(struct tevent_context *ev,
161                               const uint8_t *msg, size_t msg_len,
162                               int *fds, size_t num_fds,
163                               void *private_data)
164 {
165         struct messaging_context *msg_ctx = talloc_get_type_abort(
166                 private_data, struct messaging_context);
167         struct server_id_buf idbuf;
168         struct messaging_rec rec;
169         int64_t fds64[MIN(num_fds, INT8_MAX)];
170         size_t i;
171
172         if (msg_len < MESSAGE_HDR_LENGTH) {
173                 DBG_WARNING("message too short: %zu\n", msg_len);
174                 goto close_fail;
175         }
176
177         if (num_fds > INT8_MAX) {
178                 DBG_WARNING("too many fds: %zu\n", num_fds);
179                 goto close_fail;
180         }
181
182         /*
183          * "consume" the fds by copying them and setting
184          * the original variable to -1
185          */
186         for (i=0; i < num_fds; i++) {
187                 fds64[i] = fds[i];
188                 fds[i] = -1;
189         }
190
191         rec = (struct messaging_rec) {
192                 .msg_version = MESSAGE_VERSION,
193                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
194                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
195                 .num_fds = num_fds,
196                 .fds = fds64,
197         };
198
199         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
200
201         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
202                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
203                   server_id_str_buf(rec.src, &idbuf));
204
205         messaging_dispatch_rec(msg_ctx, ev, &rec);
206         return;
207
208 close_fail:
209         for (i=0; i < num_fds; i++) {
210                 close(fds[i]);
211         }
212 }
213
214 static int messaging_context_destructor(struct messaging_context *ctx)
215 {
216         size_t i;
217
218         for (i=0; i<ctx->num_new_waiters; i++) {
219                 if (ctx->new_waiters[i] != NULL) {
220                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
221                         ctx->new_waiters[i] = NULL;
222                 }
223         }
224         for (i=0; i<ctx->num_waiters; i++) {
225                 if (ctx->waiters[i] != NULL) {
226                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
227                         ctx->waiters[i] = NULL;
228                 }
229         }
230
231         return 0;
232 }
233
234 static const char *private_path(const char *name)
235 {
236         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
237 }
238
239 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
240                                         struct tevent_context *ev,
241                                         struct messaging_context **pmsg_ctx)
242 {
243         TALLOC_CTX *frame;
244         struct messaging_context *ctx;
245         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
246         int ret;
247         const char *lck_path;
248         const char *priv_path;
249         bool ok;
250
251         lck_path = lock_path("msg.lock");
252         if (lck_path == NULL) {
253                 return NT_STATUS_NO_MEMORY;
254         }
255
256         ok = directory_create_or_exist_strict(lck_path,
257                                               sec_initial_uid(),
258                                               0755);
259         if (!ok) {
260                 DBG_DEBUG("Could not create lock directory: %s\n",
261                           strerror(errno));
262                 return NT_STATUS_ACCESS_DENIED;
263         }
264
265         priv_path = private_path("msg.sock");
266         if (priv_path == NULL) {
267                 return NT_STATUS_NO_MEMORY;
268         }
269
270         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
271                                               0700);
272         if (!ok) {
273                 DBG_DEBUG("Could not create msg directory: %s\n",
274                           strerror(errno));
275                 return NT_STATUS_ACCESS_DENIED;
276         }
277
278         frame = talloc_stackframe();
279         if (frame == NULL) {
280                 return NT_STATUS_NO_MEMORY;
281         }
282
283         ctx = talloc_zero(frame, struct messaging_context);
284         if (ctx == NULL) {
285                 status = NT_STATUS_NO_MEMORY;
286                 goto done;
287         }
288
289         ctx->id = (struct server_id) {
290                 .pid = getpid(), .vnn = NONCLUSTER_VNN
291         };
292
293         ctx->event_ctx = ev;
294
295         sec_init();
296
297         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
298                                              ctx->event_ctx,
299                                              &ctx->id.unique_id,
300                                              priv_path,
301                                              lck_path,
302                                              messaging_recv_cb,
303                                              ctx,
304                                              &ret);
305         if (ctx->msg_dgm_ref == NULL) {
306                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
307                 status = map_nt_error_from_unix(ret);
308                 goto done;
309         }
310         talloc_set_destructor(ctx, messaging_context_destructor);
311
312         if (lp_clustering()) {
313                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
314
315                 if (ret != 0) {
316                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
317                                   strerror(ret)));
318                         status = map_nt_error_from_unix(ret);
319                         goto done;
320                 }
321         }
322         ctx->id.vnn = get_my_vnn();
323
324         ctx->names_db = server_id_db_init(ctx,
325                                           ctx->id,
326                                           lp_lock_directory(),
327                                           0,
328                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
329         if (ctx->names_db == NULL) {
330                 DBG_DEBUG("server_id_db_init failed\n");
331                 status = NT_STATUS_NO_MEMORY;
332                 goto done;
333         }
334
335         messaging_register(ctx, NULL, MSG_PING, ping_message);
336
337         /* Register some debugging related messages */
338
339         register_msg_pool_usage(ctx);
340         register_dmalloc_msgs(ctx);
341         debug_register_msgs(ctx);
342
343         {
344                 struct server_id_buf tmp;
345                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
346         }
347
348         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
349
350         status = NT_STATUS_OK;
351 done:
352         TALLOC_FREE(frame);
353
354         return status;
355 }
356
357 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
358                                          struct tevent_context *ev)
359 {
360         struct messaging_context *ctx = NULL;
361         NTSTATUS status;
362
363         status = messaging_init_internal(mem_ctx,
364                                          ev,
365                                          &ctx);
366         if (!NT_STATUS_IS_OK(status)) {
367                 return NULL;
368         }
369
370         return ctx;
371 }
372
373 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
374                                struct tevent_context *ev,
375                                struct messaging_context **pmsg_ctx)
376 {
377         return messaging_init_internal(mem_ctx,
378                                         ev,
379                                         pmsg_ctx);
380 }
381
382 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
383 {
384         return msg_ctx->id;
385 }
386
387 /*
388  * re-init after a fork
389  */
390 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
391 {
392         int ret;
393         char *lck_path;
394
395         TALLOC_FREE(msg_ctx->msg_dgm_ref);
396
397         msg_ctx->id = (struct server_id) {
398                 .pid = getpid(), .vnn = msg_ctx->id.vnn
399         };
400
401         lck_path = lock_path("msg.lock");
402         if (lck_path == NULL) {
403                 return NT_STATUS_NO_MEMORY;
404         }
405
406         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
407                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
408                 private_path("msg.sock"), lck_path,
409                 messaging_recv_cb, msg_ctx, &ret);
410
411         if (msg_ctx->msg_dgm_ref == NULL) {
412                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
413                 return map_nt_error_from_unix(ret);
414         }
415
416         if (lp_clustering()) {
417                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
418                                              msg_ctx->remote);
419
420                 if (ret != 0) {
421                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
422                                   strerror(ret)));
423                         return map_nt_error_from_unix(ret);
424                 }
425         }
426
427         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
428
429         return NT_STATUS_OK;
430 }
431
432
433 /*
434  * Register a dispatch function for a particular message type. Allow multiple
435  * registrants
436 */
437 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
438                             void *private_data,
439                             uint32_t msg_type,
440                             void (*fn)(struct messaging_context *msg,
441                                        void *private_data, 
442                                        uint32_t msg_type, 
443                                        struct server_id server_id,
444                                        DATA_BLOB *data))
445 {
446         struct messaging_callback *cb;
447
448         DEBUG(5, ("Registering messaging pointer for type %u - "
449                   "private_data=%p\n",
450                   (unsigned)msg_type, private_data));
451
452         /*
453          * Only one callback per type
454          */
455
456         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
457                 /* we allow a second registration of the same message
458                    type if it has a different private pointer. This is
459                    needed in, for example, the internal notify code,
460                    which creates a new notify context for each tree
461                    connect, and expects to receive messages to each of
462                    them. */
463                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
464                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
465                                   (unsigned)msg_type, private_data));
466                         cb->fn = fn;
467                         cb->private_data = private_data;
468                         return NT_STATUS_OK;
469                 }
470         }
471
472         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
473                 return NT_STATUS_NO_MEMORY;
474         }
475
476         cb->msg_type = msg_type;
477         cb->fn = fn;
478         cb->private_data = private_data;
479
480         DLIST_ADD(msg_ctx->callbacks, cb);
481         return NT_STATUS_OK;
482 }
483
484 /*
485   De-register the function for a particular message type.
486 */
487 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
488                           void *private_data)
489 {
490         struct messaging_callback *cb, *next;
491
492         for (cb = ctx->callbacks; cb; cb = next) {
493                 next = cb->next;
494                 if ((cb->msg_type == msg_type)
495                     && (cb->private_data == private_data)) {
496                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
497                                   (unsigned)msg_type, private_data));
498                         DLIST_REMOVE(ctx->callbacks, cb);
499                         TALLOC_FREE(cb);
500                 }
501         }
502 }
503
504 /*
505   Send a message to a particular server
506 */
507 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
508                         struct server_id server, uint32_t msg_type,
509                         const DATA_BLOB *data)
510 {
511         struct iovec iov = {0};
512
513         if (data != NULL) {
514                 iov.iov_base = data->data;
515                 iov.iov_len = data->length;
516         };
517
518         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
519 }
520
521 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
522                             struct server_id server, uint32_t msg_type,
523                             const uint8_t *buf, size_t len)
524 {
525         DATA_BLOB blob = data_blob_const(buf, len);
526         return messaging_send(msg_ctx, server, msg_type, &blob);
527 }
528
529 struct messaging_post_state {
530         struct messaging_context *msg_ctx;
531         struct messaging_rec *rec;
532 };
533
534 static void messaging_post_handler(struct tevent_context *ev,
535                                    struct tevent_immediate *ti,
536                                    void *private_data);
537
538 static int messaging_post_self(struct messaging_context *msg_ctx,
539                                struct server_id src, struct server_id dst,
540                                uint32_t msg_type,
541                                const struct iovec *iov, int iovlen,
542                                const int *fds, size_t num_fds)
543 {
544         struct tevent_immediate *ti;
545         struct messaging_post_state *state;
546
547         state = talloc(msg_ctx, struct messaging_post_state);
548         if (state == NULL) {
549                 return ENOMEM;
550         }
551         state->msg_ctx = msg_ctx;
552
553         ti = tevent_create_immediate(state);
554         if (ti == NULL) {
555                 goto fail;
556         }
557         state->rec = messaging_rec_create(
558                 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
559         if (state->rec == NULL) {
560                 goto fail;
561         }
562
563         tevent_schedule_immediate(ti, msg_ctx->event_ctx,
564                                   messaging_post_handler, state);
565         return 0;
566
567 fail:
568         TALLOC_FREE(state);
569         return ENOMEM;
570 }
571
572 static void messaging_post_handler(struct tevent_context *ev,
573                                    struct tevent_immediate *ti,
574                                    void *private_data)
575 {
576         struct messaging_post_state *state = talloc_get_type_abort(
577                 private_data, struct messaging_post_state);
578         messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
579         TALLOC_FREE(state);
580 }
581
582 int messaging_send_iov_from(struct messaging_context *msg_ctx,
583                             struct server_id src, struct server_id dst,
584                             uint32_t msg_type,
585                             const struct iovec *iov, int iovlen,
586                             const int *fds, size_t num_fds)
587 {
588         int ret;
589         uint8_t hdr[MESSAGE_HDR_LENGTH];
590         struct iovec iov2[iovlen+1];
591
592         if (server_id_is_disconnected(&dst)) {
593                 return EINVAL;
594         }
595
596         if (num_fds > INT8_MAX) {
597                 return EINVAL;
598         }
599
600         if (dst.vnn != msg_ctx->id.vnn) {
601                 if (num_fds > 0) {
602                         return ENOSYS;
603                 }
604
605                 ret = msg_ctx->remote->send_fn(src, dst,
606                                                msg_type, iov, iovlen,
607                                                NULL, 0,
608                                                msg_ctx->remote);
609                 return ret;
610         }
611
612         if (server_id_equal(&dst, &msg_ctx->id)) {
613                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
614                                           iov, iovlen, fds, num_fds);
615                 return ret;
616         }
617
618         message_hdr_put(hdr, msg_type, src, dst);
619         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
620         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
621
622         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
623
624         if (ret == EACCES) {
625                 become_root();
626                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
627                                          fds, num_fds);
628                 unbecome_root();
629         }
630
631         if (ret == ECONNREFUSED) {
632                 /*
633                  * Linux returns this when a socket exists in the file
634                  * system without a listening process. This is not
635                  * documented in susv4 or the linux manpages, but it's
636                  * easily testable. For the higher levels this is the
637                  * same as "destination does not exist"
638                  */
639                 ret = ENOENT;
640         }
641
642         return ret;
643 }
644
645 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
646                             struct server_id dst, uint32_t msg_type,
647                             const struct iovec *iov, int iovlen,
648                             const int *fds, size_t num_fds)
649 {
650         int ret;
651
652         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
653                                       iov, iovlen, fds, num_fds);
654         if (ret != 0) {
655                 return map_nt_error_from_unix(ret);
656         }
657         return NT_STATUS_OK;
658 }
659
660 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
661                                                struct messaging_rec *rec)
662 {
663         struct messaging_rec *result;
664         size_t fds_size = sizeof(int64_t) * rec->num_fds;
665         size_t payload_len;
666
667         payload_len = rec->buf.length + fds_size;
668         if (payload_len < rec->buf.length) {
669                 /* overflow */
670                 return NULL;
671         }
672
673         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
674                                       payload_len);
675         if (result == NULL) {
676                 return NULL;
677         }
678         *result = *rec;
679
680         /* Doesn't fail, see talloc_pooled_object */
681
682         result->buf.data = talloc_memdup(result, rec->buf.data,
683                                          rec->buf.length);
684
685         result->fds = NULL;
686         if (result->num_fds > 0) {
687                 result->fds = talloc_memdup(result, rec->fds, fds_size);
688         }
689
690         return result;
691 }
692
693 struct messaging_filtered_read_state {
694         struct tevent_context *ev;
695         struct messaging_context *msg_ctx;
696         struct messaging_dgm_fde *fde;
697
698         bool (*filter)(struct messaging_rec *rec, void *private_data);
699         void *private_data;
700
701         struct messaging_rec *rec;
702 };
703
704 static void messaging_filtered_read_cleanup(struct tevent_req *req,
705                                             enum tevent_req_state req_state);
706
707 struct tevent_req *messaging_filtered_read_send(
708         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
709         struct messaging_context *msg_ctx,
710         bool (*filter)(struct messaging_rec *rec, void *private_data),
711         void *private_data)
712 {
713         struct tevent_req *req;
714         struct messaging_filtered_read_state *state;
715         size_t new_waiters_len;
716
717         req = tevent_req_create(mem_ctx, &state,
718                                 struct messaging_filtered_read_state);
719         if (req == NULL) {
720                 return NULL;
721         }
722         state->ev = ev;
723         state->msg_ctx = msg_ctx;
724         state->filter = filter;
725         state->private_data = private_data;
726
727         /*
728          * We have to defer the callback here, as we might be called from
729          * within a different tevent_context than state->ev
730          */
731         tevent_req_defer_callback(req, state->ev);
732
733         state->fde = messaging_dgm_register_tevent_context(state, ev);
734         if (tevent_req_nomem(state->fde, req)) {
735                 return tevent_req_post(req, ev);
736         }
737
738         /*
739          * We add ourselves to the "new_waiters" array, not the "waiters"
740          * array. If we are called from within messaging_read_done,
741          * messaging_dispatch_rec will be in an active for-loop on
742          * "waiters". We must be careful not to mess with this array, because
743          * it could mean that a single event is being delivered twice.
744          */
745
746         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
747
748         if (new_waiters_len == msg_ctx->num_new_waiters) {
749                 struct tevent_req **tmp;
750
751                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
752                                      struct tevent_req *, new_waiters_len+1);
753                 if (tevent_req_nomem(tmp, req)) {
754                         return tevent_req_post(req, ev);
755                 }
756                 msg_ctx->new_waiters = tmp;
757         }
758
759         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
760         msg_ctx->num_new_waiters += 1;
761         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
762
763         return req;
764 }
765
766 static void messaging_filtered_read_cleanup(struct tevent_req *req,
767                                             enum tevent_req_state req_state)
768 {
769         struct messaging_filtered_read_state *state = tevent_req_data(
770                 req, struct messaging_filtered_read_state);
771         struct messaging_context *msg_ctx = state->msg_ctx;
772         size_t i;
773
774         tevent_req_set_cleanup_fn(req, NULL);
775
776         TALLOC_FREE(state->fde);
777
778         /*
779          * Just set the [new_]waiters entry to NULL, be careful not to mess
780          * with the other "waiters" array contents. We are often called from
781          * within "messaging_dispatch_rec", which loops over
782          * "waiters". Messing with the "waiters" array will mess up that
783          * for-loop.
784          */
785
786         for (i=0; i<msg_ctx->num_waiters; i++) {
787                 if (msg_ctx->waiters[i] == req) {
788                         msg_ctx->waiters[i] = NULL;
789                         return;
790                 }
791         }
792
793         for (i=0; i<msg_ctx->num_new_waiters; i++) {
794                 if (msg_ctx->new_waiters[i] == req) {
795                         msg_ctx->new_waiters[i] = NULL;
796                         return;
797                 }
798         }
799 }
800
801 static void messaging_filtered_read_done(struct tevent_req *req,
802                                          struct messaging_rec *rec)
803 {
804         struct messaging_filtered_read_state *state = tevent_req_data(
805                 req, struct messaging_filtered_read_state);
806
807         state->rec = messaging_rec_dup(state, rec);
808         if (tevent_req_nomem(state->rec, req)) {
809                 return;
810         }
811         tevent_req_done(req);
812 }
813
814 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
815                                  struct messaging_rec **presult)
816 {
817         struct messaging_filtered_read_state *state = tevent_req_data(
818                 req, struct messaging_filtered_read_state);
819         int err;
820
821         if (tevent_req_is_unix_error(req, &err)) {
822                 tevent_req_received(req);
823                 return err;
824         }
825         if (presult != NULL) {
826                 *presult = talloc_move(mem_ctx, &state->rec);
827         }
828         return 0;
829 }
830
831 struct messaging_read_state {
832         uint32_t msg_type;
833         struct messaging_rec *rec;
834 };
835
836 static bool messaging_read_filter(struct messaging_rec *rec,
837                                   void *private_data);
838 static void messaging_read_done(struct tevent_req *subreq);
839
840 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
841                                        struct tevent_context *ev,
842                                        struct messaging_context *msg,
843                                        uint32_t msg_type)
844 {
845         struct tevent_req *req, *subreq;
846         struct messaging_read_state *state;
847
848         req = tevent_req_create(mem_ctx, &state,
849                                 struct messaging_read_state);
850         if (req == NULL) {
851                 return NULL;
852         }
853         state->msg_type = msg_type;
854
855         subreq = messaging_filtered_read_send(state, ev, msg,
856                                               messaging_read_filter, state);
857         if (tevent_req_nomem(subreq, req)) {
858                 return tevent_req_post(req, ev);
859         }
860         tevent_req_set_callback(subreq, messaging_read_done, req);
861         return req;
862 }
863
864 static bool messaging_read_filter(struct messaging_rec *rec,
865                                   void *private_data)
866 {
867         struct messaging_read_state *state = talloc_get_type_abort(
868                 private_data, struct messaging_read_state);
869
870         if (rec->num_fds != 0) {
871                 return false;
872         }
873
874         return rec->msg_type == state->msg_type;
875 }
876
877 static void messaging_read_done(struct tevent_req *subreq)
878 {
879         struct tevent_req *req = tevent_req_callback_data(
880                 subreq, struct tevent_req);
881         struct messaging_read_state *state = tevent_req_data(
882                 req, struct messaging_read_state);
883         int ret;
884
885         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
886         TALLOC_FREE(subreq);
887         if (tevent_req_error(req, ret)) {
888                 return;
889         }
890         tevent_req_done(req);
891 }
892
893 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
894                         struct messaging_rec **presult)
895 {
896         struct messaging_read_state *state = tevent_req_data(
897                 req, struct messaging_read_state);
898         int err;
899
900         if (tevent_req_is_unix_error(req, &err)) {
901                 return err;
902         }
903         if (presult != NULL) {
904                 *presult = talloc_move(mem_ctx, &state->rec);
905         }
906         return 0;
907 }
908
909 struct messaging_handler_state {
910         struct tevent_context *ev;
911         struct messaging_context *msg_ctx;
912         uint32_t msg_type;
913         bool (*handler)(struct messaging_context *msg_ctx,
914                         struct messaging_rec **rec, void *private_data);
915         void *private_data;
916 };
917
918 static void messaging_handler_got_msg(struct tevent_req *subreq);
919
920 struct tevent_req *messaging_handler_send(
921         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
922         struct messaging_context *msg_ctx, uint32_t msg_type,
923         bool (*handler)(struct messaging_context *msg_ctx,
924                         struct messaging_rec **rec, void *private_data),
925         void *private_data)
926 {
927         struct tevent_req *req, *subreq;
928         struct messaging_handler_state *state;
929
930         req = tevent_req_create(mem_ctx, &state,
931                                 struct messaging_handler_state);
932         if (req == NULL) {
933                 return NULL;
934         }
935         state->ev = ev;
936         state->msg_ctx = msg_ctx;
937         state->msg_type = msg_type;
938         state->handler = handler;
939         state->private_data = private_data;
940
941         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
942                                      state->msg_type);
943         if (tevent_req_nomem(subreq, req)) {
944                 return tevent_req_post(req, ev);
945         }
946         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
947         return req;
948 }
949
950 static void messaging_handler_got_msg(struct tevent_req *subreq)
951 {
952         struct tevent_req *req = tevent_req_callback_data(
953                 subreq, struct tevent_req);
954         struct messaging_handler_state *state = tevent_req_data(
955                 req, struct messaging_handler_state);
956         struct messaging_rec *rec;
957         int ret;
958         bool ok;
959
960         ret = messaging_read_recv(subreq, state, &rec);
961         TALLOC_FREE(subreq);
962         if (tevent_req_error(req, ret)) {
963                 return;
964         }
965
966         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
967                                      state->msg_type);
968         if (tevent_req_nomem(subreq, req)) {
969                 return;
970         }
971         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
972
973         ok = state->handler(state->msg_ctx, &rec, state->private_data);
974         TALLOC_FREE(rec);
975         if (ok) {
976                 /*
977                  * Next round
978                  */
979                 return;
980         }
981         TALLOC_FREE(subreq);
982         tevent_req_done(req);
983 }
984
985 int messaging_handler_recv(struct tevent_req *req)
986 {
987         return tevent_req_simple_recv_unix(req);
988 }
989
990 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
991 {
992         if (msg_ctx->num_new_waiters == 0) {
993                 return true;
994         }
995
996         if (talloc_array_length(msg_ctx->waiters) <
997             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
998                 struct tevent_req **tmp;
999                 tmp = talloc_realloc(
1000                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1001                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1002                 if (tmp == NULL) {
1003                         DEBUG(1, ("%s: talloc failed\n", __func__));
1004                         return false;
1005                 }
1006                 msg_ctx->waiters = tmp;
1007         }
1008
1009         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1010                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1011
1012         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1013         msg_ctx->num_new_waiters = 0;
1014
1015         return true;
1016 }
1017
1018 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1019                                        struct messaging_rec *rec)
1020 {
1021         struct messaging_callback *cb, *next;
1022
1023         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1024                 size_t j;
1025
1026                 next = cb->next;
1027                 if (cb->msg_type != rec->msg_type) {
1028                         continue;
1029                 }
1030
1031                 /*
1032                  * the old style callbacks don't support fd passing
1033                  */
1034                 for (j=0; j < rec->num_fds; j++) {
1035                         int fd = rec->fds[j];
1036                         close(fd);
1037                 }
1038                 rec->num_fds = 0;
1039                 rec->fds = NULL;
1040
1041                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1042                        rec->src, &rec->buf);
1043
1044                 return true;
1045         }
1046
1047         return false;
1048 }
1049
1050 /*
1051   Dispatch one messaging_rec
1052 */
1053 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1054                                    struct tevent_context *ev,
1055                                    struct messaging_rec *rec)
1056 {
1057         size_t i;
1058         bool consumed;
1059
1060         if (ev == msg_ctx->event_ctx) {
1061                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1062                 if (consumed) {
1063                         return;
1064                 }
1065         }
1066
1067         if (!messaging_append_new_waiters(msg_ctx)) {
1068                 size_t j;
1069                 for (j=0; j < rec->num_fds; j++) {
1070                         int fd = rec->fds[j];
1071                         close(fd);
1072                 }
1073                 rec->num_fds = 0;
1074                 rec->fds = NULL;
1075                 return;
1076         }
1077
1078         i = 0;
1079         while (i < msg_ctx->num_waiters) {
1080                 struct tevent_req *req;
1081                 struct messaging_filtered_read_state *state;
1082
1083                 req = msg_ctx->waiters[i];
1084                 if (req == NULL) {
1085                         /*
1086                          * This got cleaned up. In the meantime,
1087                          * move everything down one. We need
1088                          * to keep the order of waiters, as
1089                          * other code may depend on this.
1090                          */
1091                         if (i < msg_ctx->num_waiters - 1) {
1092                                 memmove(&msg_ctx->waiters[i],
1093                                         &msg_ctx->waiters[i+1],
1094                                         sizeof(struct tevent_req *) *
1095                                             (msg_ctx->num_waiters - i - 1));
1096                         }
1097                         msg_ctx->num_waiters -= 1;
1098                         continue;
1099                 }
1100
1101                 state = tevent_req_data(
1102                         req, struct messaging_filtered_read_state);
1103                 if ((ev == state->ev) &&
1104                     state->filter(rec, state->private_data)) {
1105                         messaging_filtered_read_done(req, rec);
1106                         return;
1107                 }
1108
1109                 i += 1;
1110         }
1111
1112         if (ev != msg_ctx->event_ctx) {
1113                 struct iovec iov;
1114                 int fds[rec->num_fds];
1115                 int ret;
1116
1117                 /*
1118                  * We've been listening on a nested event
1119                  * context. Messages need to be handled in the main
1120                  * event context, so post to ourselves
1121                  */
1122
1123                 iov.iov_base = rec->buf.data;
1124                 iov.iov_len = rec->buf.length;
1125
1126                 for (i=0; i<rec->num_fds; i++) {
1127                         fds[i] = rec->fds[i];
1128                 }
1129
1130                 ret = messaging_post_self(
1131                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1132                         &iov, 1, fds, rec->num_fds);
1133                 if (ret == 0) {
1134                         return;
1135                 }
1136         }
1137
1138         /*
1139          * If the fd-array isn't used, just close it.
1140          */
1141         for (i=0; i < rec->num_fds; i++) {
1142                 int fd = rec->fds[i];
1143                 close(fd);
1144         }
1145         rec->num_fds = 0;
1146         rec->fds = NULL;
1147 }
1148
1149 static int mess_parent_dgm_cleanup(void *private_data);
1150 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1151
1152 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1153 {
1154         struct tevent_req *req;
1155
1156         req = background_job_send(
1157                 msg, msg->event_ctx, msg, NULL, 0,
1158                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1159                             60*15),
1160                 mess_parent_dgm_cleanup, msg);
1161         if (req == NULL) {
1162                 return false;
1163         }
1164         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1165         return true;
1166 }
1167
1168 static int mess_parent_dgm_cleanup(void *private_data)
1169 {
1170         int ret;
1171
1172         ret = messaging_dgm_wipe();
1173         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1174                    ret ? strerror(ret) : "ok"));
1175         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1176                            60*15);
1177 }
1178
1179 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1180 {
1181         struct messaging_context *msg = tevent_req_callback_data(
1182                 req, struct messaging_context);
1183         NTSTATUS status;
1184
1185         status = background_job_recv(req);
1186         TALLOC_FREE(req);
1187         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1188                   nt_errstr(status)));
1189
1190         req = background_job_send(
1191                 msg, msg->event_ctx, msg, NULL, 0,
1192                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1193                             60*15),
1194                 mess_parent_dgm_cleanup, msg);
1195         if (req == NULL) {
1196                 DEBUG(1, ("background_job_send failed\n"));
1197                 return;
1198         }
1199         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1200 }
1201
1202 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1203 {
1204         int ret;
1205
1206         if (pid == 0) {
1207                 ret = messaging_dgm_wipe();
1208         } else {
1209                 ret = messaging_dgm_cleanup(pid);
1210         }
1211
1212         return ret;
1213 }
1214
1215 struct tevent_context *messaging_tevent_context(
1216         struct messaging_context *msg_ctx)
1217 {
1218         return msg_ctx->event_ctx;
1219 }
1220
1221 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1222 {
1223         return msg_ctx->names_db;
1224 }
1225
1226 /** @} **/