lib: Push down unique generation one level
[vlendec/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32_t msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         struct server_id_buf idbuf;
217         struct messaging_rec rec;
218         int64_t fds64[MIN(num_fds, INT8_MAX)];
219         size_t i;
220
221         if (msg_len < MESSAGE_HDR_LENGTH) {
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 goto close_fail;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228                 goto close_fail;
229         }
230
231         /*
232          * "consume" the fds by copying them and setting
233          * the original variable to -1
234          */
235         for (i=0; i < num_fds; i++) {
236                 fds64[i] = fds[i];
237                 fds[i] = -1;
238         }
239
240         rec = (struct messaging_rec) {
241                 .msg_version = MESSAGE_VERSION,
242                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244                 .num_fds = num_fds,
245                 .fds = fds64,
246         };
247
248         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
249
250         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251                    __func__, (unsigned)rec.msg_type,
252                    (unsigned)rec.buf.length,
253                    (unsigned)num_fds,
254                    server_id_str_buf(rec.src, &idbuf)));
255
256         messaging_dispatch_rec(msg_ctx, &rec);
257         return;
258
259 close_fail:
260         for (i=0; i < num_fds; i++) {
261                 close(fds[i]);
262         }
263 }
264
265 static int messaging_context_destructor(struct messaging_context *ctx)
266 {
267         unsigned i;
268
269         for (i=0; i<ctx->num_new_waiters; i++) {
270                 if (ctx->new_waiters[i] != NULL) {
271                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272                         ctx->new_waiters[i] = NULL;
273                 }
274         }
275         for (i=0; i<ctx->num_waiters; i++) {
276                 if (ctx->waiters[i] != NULL) {
277                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278                         ctx->waiters[i] = NULL;
279                 }
280         }
281
282         return 0;
283 }
284
285 static const char *private_path(const char *name)
286 {
287         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
288 }
289
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
291                                          struct tevent_context *ev)
292 {
293         struct messaging_context *ctx;
294         int ret;
295         const char *lck_path;
296         const char *priv_path;
297         bool ok;
298
299         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
300                 return NULL;
301         }
302
303         ctx->id = (struct server_id) {
304                 .pid = getpid(), .vnn = NONCLUSTER_VNN
305         };
306
307         ctx->event_ctx = ev;
308
309         sec_init();
310
311         lck_path = lock_path("msg.lock");
312         if (lck_path == NULL) {
313                 TALLOC_FREE(ctx);
314                 return NULL;
315         }
316
317         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
318                                               0755);
319         if (!ok) {
320                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
321                            __func__, strerror(errno)));
322                 TALLOC_FREE(ctx);
323                 return NULL;
324         }
325
326         priv_path = private_path("msg.sock");
327         if (priv_path == NULL) {
328                 TALLOC_FREE(ctx);
329                 return NULL;
330         }
331
332         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
333                                               0700);
334         if (!ok) {
335                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
336                            __func__, strerror(errno)));
337                 TALLOC_FREE(ctx);
338                 return NULL;
339         }
340
341         ctx->msg_dgm_ref = messaging_dgm_ref(
342                 ctx, ctx->event_ctx, &ctx->id.unique_id,
343                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
344
345         if (ctx->msg_dgm_ref == NULL) {
346                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
347                 TALLOC_FREE(ctx);
348                 return NULL;
349         }
350
351         talloc_set_destructor(ctx, messaging_context_destructor);
352
353         if (lp_clustering()) {
354                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
355
356                 if (ret != 0) {
357                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
358                                   strerror(ret)));
359                         TALLOC_FREE(ctx);
360                         return NULL;
361                 }
362         }
363         ctx->id.vnn = get_my_vnn();
364
365         ctx->names_db = server_id_db_init(
366                 ctx, ctx->id, lp_lock_directory(), 0,
367                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
368         if (ctx->names_db == NULL) {
369                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
370                 TALLOC_FREE(ctx);
371                 return NULL;
372         }
373
374         messaging_register(ctx, NULL, MSG_PING, ping_message);
375
376         /* Register some debugging related messages */
377
378         register_msg_pool_usage(ctx);
379         register_dmalloc_msgs(ctx);
380         debug_register_msgs(ctx);
381
382         return ctx;
383 }
384
385 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
386 {
387         return msg_ctx->id;
388 }
389
390 /*
391  * re-init after a fork
392  */
393 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
394 {
395         int ret;
396
397         TALLOC_FREE(msg_ctx->msg_dgm_ref);
398
399         msg_ctx->id = (struct server_id) {
400                 .pid = getpid(), .vnn = msg_ctx->id.vnn
401         };
402
403         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
404                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
405                 private_path("msg.sock"), lock_path("msg.lock"),
406                 messaging_recv_cb, msg_ctx, &ret);
407
408         if (msg_ctx->msg_dgm_ref == NULL) {
409                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
410                 return map_nt_error_from_unix(ret);
411         }
412
413         TALLOC_FREE(msg_ctx->remote);
414
415         if (lp_clustering()) {
416                 ret = messaging_ctdbd_init(msg_ctx, msg_ctx,
417                                            &msg_ctx->remote);
418
419                 if (ret != 0) {
420                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
421                                   strerror(ret)));
422                         return map_nt_error_from_unix(ret);
423                 }
424         }
425
426         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
427
428         return NT_STATUS_OK;
429 }
430
431
432 /*
433  * Register a dispatch function for a particular message type. Allow multiple
434  * registrants
435 */
436 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
437                             void *private_data,
438                             uint32_t msg_type,
439                             void (*fn)(struct messaging_context *msg,
440                                        void *private_data, 
441                                        uint32_t msg_type, 
442                                        struct server_id server_id,
443                                        DATA_BLOB *data))
444 {
445         struct messaging_callback *cb;
446
447         DEBUG(5, ("Registering messaging pointer for type %u - "
448                   "private_data=%p\n",
449                   (unsigned)msg_type, private_data));
450
451         /*
452          * Only one callback per type
453          */
454
455         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
456                 /* we allow a second registration of the same message
457                    type if it has a different private pointer. This is
458                    needed in, for example, the internal notify code,
459                    which creates a new notify context for each tree
460                    connect, and expects to receive messages to each of
461                    them. */
462                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
463                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
464                                   (unsigned)msg_type, private_data));
465                         cb->fn = fn;
466                         cb->private_data = private_data;
467                         return NT_STATUS_OK;
468                 }
469         }
470
471         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
472                 return NT_STATUS_NO_MEMORY;
473         }
474
475         cb->msg_type = msg_type;
476         cb->fn = fn;
477         cb->private_data = private_data;
478
479         DLIST_ADD(msg_ctx->callbacks, cb);
480         return NT_STATUS_OK;
481 }
482
483 /*
484   De-register the function for a particular message type.
485 */
486 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
487                           void *private_data)
488 {
489         struct messaging_callback *cb, *next;
490
491         for (cb = ctx->callbacks; cb; cb = next) {
492                 next = cb->next;
493                 if ((cb->msg_type == msg_type)
494                     && (cb->private_data == private_data)) {
495                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
496                                   (unsigned)msg_type, private_data));
497                         DLIST_REMOVE(ctx->callbacks, cb);
498                         TALLOC_FREE(cb);
499                 }
500         }
501 }
502
503 /*
504   Send a message to a particular server
505 */
506 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
507                         struct server_id server, uint32_t msg_type,
508                         const DATA_BLOB *data)
509 {
510         struct iovec iov;
511
512         iov.iov_base = data->data;
513         iov.iov_len = data->length;
514
515         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
516 }
517
518 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
519                             struct server_id server, uint32_t msg_type,
520                             const uint8_t *buf, size_t len)
521 {
522         DATA_BLOB blob = data_blob_const(buf, len);
523         return messaging_send(msg_ctx, server, msg_type, &blob);
524 }
525
526 int messaging_send_iov_from(struct messaging_context *msg_ctx,
527                             struct server_id src, struct server_id dst,
528                             uint32_t msg_type,
529                             const struct iovec *iov, int iovlen,
530                             const int *fds, size_t num_fds)
531 {
532         int ret;
533         uint8_t hdr[MESSAGE_HDR_LENGTH];
534         struct iovec iov2[iovlen+1];
535
536         if (server_id_is_disconnected(&dst)) {
537                 return EINVAL;
538         }
539
540         if (num_fds > INT8_MAX) {
541                 return EINVAL;
542         }
543
544         if (!procid_is_local(&dst)) {
545                 if (num_fds > 0) {
546                         return ENOSYS;
547                 }
548
549                 ret = msg_ctx->remote->send_fn(src, dst,
550                                                msg_type, iov, iovlen,
551                                                NULL, 0,
552                                                msg_ctx->remote);
553                 return ret;
554         }
555
556         message_hdr_put(hdr, msg_type, src, dst);
557         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
558         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
559
560         become_root();
561         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
562         unbecome_root();
563
564         return ret;
565 }
566
567 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
568                             struct server_id dst, uint32_t msg_type,
569                             const struct iovec *iov, int iovlen,
570                             const int *fds, size_t num_fds)
571 {
572         int ret;
573
574         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
575                                       iov, iovlen, fds, num_fds);
576         if (ret != 0) {
577                 return map_nt_error_from_unix(ret);
578         }
579         return NT_STATUS_OK;
580 }
581
582 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
583                                                struct messaging_rec *rec)
584 {
585         struct messaging_rec *result;
586         size_t fds_size = sizeof(int64_t) * rec->num_fds;
587
588         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
589                                       rec->buf.length + fds_size);
590         if (result == NULL) {
591                 return NULL;
592         }
593         *result = *rec;
594
595         /* Doesn't fail, see talloc_pooled_object */
596
597         result->buf.data = talloc_memdup(result, rec->buf.data,
598                                          rec->buf.length);
599
600         result->fds = NULL;
601         if (result->num_fds > 0) {
602                 result->fds = talloc_memdup(result, rec->fds, fds_size);
603         }
604
605         return result;
606 }
607
608 struct messaging_filtered_read_state {
609         struct tevent_context *ev;
610         struct messaging_context *msg_ctx;
611         void *tevent_handle;
612
613         bool (*filter)(struct messaging_rec *rec, void *private_data);
614         void *private_data;
615
616         struct messaging_rec *rec;
617 };
618
619 static void messaging_filtered_read_cleanup(struct tevent_req *req,
620                                             enum tevent_req_state req_state);
621
622 struct tevent_req *messaging_filtered_read_send(
623         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
624         struct messaging_context *msg_ctx,
625         bool (*filter)(struct messaging_rec *rec, void *private_data),
626         void *private_data)
627 {
628         struct tevent_req *req;
629         struct messaging_filtered_read_state *state;
630         size_t new_waiters_len;
631
632         req = tevent_req_create(mem_ctx, &state,
633                                 struct messaging_filtered_read_state);
634         if (req == NULL) {
635                 return NULL;
636         }
637         state->ev = ev;
638         state->msg_ctx = msg_ctx;
639         state->filter = filter;
640         state->private_data = private_data;
641
642         /*
643          * We have to defer the callback here, as we might be called from
644          * within a different tevent_context than state->ev
645          */
646         tevent_req_defer_callback(req, state->ev);
647
648         state->tevent_handle = messaging_dgm_register_tevent_context(
649                 state, ev);
650         if (tevent_req_nomem(state->tevent_handle, req)) {
651                 return tevent_req_post(req, ev);
652         }
653
654         /*
655          * We add ourselves to the "new_waiters" array, not the "waiters"
656          * array. If we are called from within messaging_read_done,
657          * messaging_dispatch_rec will be in an active for-loop on
658          * "waiters". We must be careful not to mess with this array, because
659          * it could mean that a single event is being delivered twice.
660          */
661
662         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
663
664         if (new_waiters_len == msg_ctx->num_new_waiters) {
665                 struct tevent_req **tmp;
666
667                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
668                                      struct tevent_req *, new_waiters_len+1);
669                 if (tevent_req_nomem(tmp, req)) {
670                         return tevent_req_post(req, ev);
671                 }
672                 msg_ctx->new_waiters = tmp;
673         }
674
675         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
676         msg_ctx->num_new_waiters += 1;
677         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
678
679         return req;
680 }
681
682 static void messaging_filtered_read_cleanup(struct tevent_req *req,
683                                             enum tevent_req_state req_state)
684 {
685         struct messaging_filtered_read_state *state = tevent_req_data(
686                 req, struct messaging_filtered_read_state);
687         struct messaging_context *msg_ctx = state->msg_ctx;
688         unsigned i;
689
690         tevent_req_set_cleanup_fn(req, NULL);
691
692         TALLOC_FREE(state->tevent_handle);
693
694         /*
695          * Just set the [new_]waiters entry to NULL, be careful not to mess
696          * with the other "waiters" array contents. We are often called from
697          * within "messaging_dispatch_rec", which loops over
698          * "waiters". Messing with the "waiters" array will mess up that
699          * for-loop.
700          */
701
702         for (i=0; i<msg_ctx->num_waiters; i++) {
703                 if (msg_ctx->waiters[i] == req) {
704                         msg_ctx->waiters[i] = NULL;
705                         return;
706                 }
707         }
708
709         for (i=0; i<msg_ctx->num_new_waiters; i++) {
710                 if (msg_ctx->new_waiters[i] == req) {
711                         msg_ctx->new_waiters[i] = NULL;
712                         return;
713                 }
714         }
715 }
716
717 static void messaging_filtered_read_done(struct tevent_req *req,
718                                          struct messaging_rec *rec)
719 {
720         struct messaging_filtered_read_state *state = tevent_req_data(
721                 req, struct messaging_filtered_read_state);
722
723         state->rec = messaging_rec_dup(state, rec);
724         if (tevent_req_nomem(state->rec, req)) {
725                 return;
726         }
727         tevent_req_done(req);
728 }
729
730 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
731                                  struct messaging_rec **presult)
732 {
733         struct messaging_filtered_read_state *state = tevent_req_data(
734                 req, struct messaging_filtered_read_state);
735         int err;
736
737         if (tevent_req_is_unix_error(req, &err)) {
738                 tevent_req_received(req);
739                 return err;
740         }
741         *presult = talloc_move(mem_ctx, &state->rec);
742         return 0;
743 }
744
745 struct messaging_read_state {
746         uint32_t msg_type;
747         struct messaging_rec *rec;
748 };
749
750 static bool messaging_read_filter(struct messaging_rec *rec,
751                                   void *private_data);
752 static void messaging_read_done(struct tevent_req *subreq);
753
754 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
755                                        struct tevent_context *ev,
756                                        struct messaging_context *msg,
757                                        uint32_t msg_type)
758 {
759         struct tevent_req *req, *subreq;
760         struct messaging_read_state *state;
761
762         req = tevent_req_create(mem_ctx, &state,
763                                 struct messaging_read_state);
764         if (req == NULL) {
765                 return NULL;
766         }
767         state->msg_type = msg_type;
768
769         subreq = messaging_filtered_read_send(state, ev, msg,
770                                               messaging_read_filter, state);
771         if (tevent_req_nomem(subreq, req)) {
772                 return tevent_req_post(req, ev);
773         }
774         tevent_req_set_callback(subreq, messaging_read_done, req);
775         return req;
776 }
777
778 static bool messaging_read_filter(struct messaging_rec *rec,
779                                   void *private_data)
780 {
781         struct messaging_read_state *state = talloc_get_type_abort(
782                 private_data, struct messaging_read_state);
783
784         if (rec->num_fds != 0) {
785                 return false;
786         }
787
788         return rec->msg_type == state->msg_type;
789 }
790
791 static void messaging_read_done(struct tevent_req *subreq)
792 {
793         struct tevent_req *req = tevent_req_callback_data(
794                 subreq, struct tevent_req);
795         struct messaging_read_state *state = tevent_req_data(
796                 req, struct messaging_read_state);
797         int ret;
798
799         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
800         TALLOC_FREE(subreq);
801         if (tevent_req_error(req, ret)) {
802                 return;
803         }
804         tevent_req_done(req);
805 }
806
807 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
808                         struct messaging_rec **presult)
809 {
810         struct messaging_read_state *state = tevent_req_data(
811                 req, struct messaging_read_state);
812         int err;
813
814         if (tevent_req_is_unix_error(req, &err)) {
815                 return err;
816         }
817         if (presult != NULL) {
818                 *presult = talloc_move(mem_ctx, &state->rec);
819         }
820         return 0;
821 }
822
823 struct messaging_handler_state {
824         struct tevent_context *ev;
825         struct messaging_context *msg_ctx;
826         uint32_t msg_type;
827         bool (*handler)(struct messaging_context *msg_ctx,
828                         struct messaging_rec **rec, void *private_data);
829         void *private_data;
830 };
831
832 static void messaging_handler_got_msg(struct tevent_req *subreq);
833
834 struct tevent_req *messaging_handler_send(
835         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
836         struct messaging_context *msg_ctx, uint32_t msg_type,
837         bool (*handler)(struct messaging_context *msg_ctx,
838                         struct messaging_rec **rec, void *private_data),
839         void *private_data)
840 {
841         struct tevent_req *req, *subreq;
842         struct messaging_handler_state *state;
843
844         req = tevent_req_create(mem_ctx, &state,
845                                 struct messaging_handler_state);
846         if (req == NULL) {
847                 return NULL;
848         }
849         state->ev = ev;
850         state->msg_ctx = msg_ctx;
851         state->msg_type = msg_type;
852         state->handler = handler;
853         state->private_data = private_data;
854
855         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
856                                      state->msg_type);
857         if (tevent_req_nomem(subreq, req)) {
858                 return tevent_req_post(req, ev);
859         }
860         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
861         return req;
862 }
863
864 static void messaging_handler_got_msg(struct tevent_req *subreq)
865 {
866         struct tevent_req *req = tevent_req_callback_data(
867                 subreq, struct tevent_req);
868         struct messaging_handler_state *state = tevent_req_data(
869                 req, struct messaging_handler_state);
870         struct messaging_rec *rec;
871         int ret;
872         bool ok;
873
874         ret = messaging_read_recv(subreq, state, &rec);
875         TALLOC_FREE(subreq);
876         if (tevent_req_error(req, ret)) {
877                 return;
878         }
879
880         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
881                                      state->msg_type);
882         if (tevent_req_nomem(subreq, req)) {
883                 return;
884         }
885         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
886
887         ok = state->handler(state->msg_ctx, &rec, state->private_data);
888         TALLOC_FREE(rec);
889         if (ok) {
890                 /*
891                  * Next round
892                  */
893                 return;
894         }
895         TALLOC_FREE(subreq);
896         tevent_req_done(req);
897 }
898
899 int messaging_handler_recv(struct tevent_req *req)
900 {
901         return tevent_req_simple_recv_unix(req);
902 }
903
904 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
905 {
906         if (msg_ctx->num_new_waiters == 0) {
907                 return true;
908         }
909
910         if (talloc_array_length(msg_ctx->waiters) <
911             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
912                 struct tevent_req **tmp;
913                 tmp = talloc_realloc(
914                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
915                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
916                 if (tmp == NULL) {
917                         DEBUG(1, ("%s: talloc failed\n", __func__));
918                         return false;
919                 }
920                 msg_ctx->waiters = tmp;
921         }
922
923         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
924                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
925
926         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
927         msg_ctx->num_new_waiters = 0;
928
929         return true;
930 }
931
932 /*
933   Dispatch one messaging_rec
934 */
935 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
936                                    struct messaging_rec *rec)
937 {
938         struct messaging_callback *cb, *next;
939         unsigned i;
940         size_t j;
941
942         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
943                 next = cb->next;
944                 if (cb->msg_type != rec->msg_type) {
945                         continue;
946                 }
947
948                 /*
949                  * the old style callbacks don't support fd passing
950                  */
951                 for (j=0; j < rec->num_fds; j++) {
952                         int fd = rec->fds[j];
953                         close(fd);
954                 }
955                 rec->num_fds = 0;
956                 rec->fds = NULL;
957
958                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
959                        rec->src, &rec->buf);
960
961                 /*
962                  * we continue looking for matching messages after finding
963                  * one. This matters for subsystems like the internal notify
964                  * code which register more than one handler for the same
965                  * message type
966                  */
967         }
968
969         if (!messaging_append_new_waiters(msg_ctx)) {
970                 for (j=0; j < rec->num_fds; j++) {
971                         int fd = rec->fds[j];
972                         close(fd);
973                 }
974                 rec->num_fds = 0;
975                 rec->fds = NULL;
976                 return;
977         }
978
979         i = 0;
980         while (i < msg_ctx->num_waiters) {
981                 struct tevent_req *req;
982                 struct messaging_filtered_read_state *state;
983
984                 req = msg_ctx->waiters[i];
985                 if (req == NULL) {
986                         /*
987                          * This got cleaned up. In the meantime,
988                          * move everything down one. We need
989                          * to keep the order of waiters, as
990                          * other code may depend on this.
991                          */
992                         if (i < msg_ctx->num_waiters - 1) {
993                                 memmove(&msg_ctx->waiters[i],
994                                         &msg_ctx->waiters[i+1],
995                                         sizeof(struct tevent_req *) *
996                                             (msg_ctx->num_waiters - i - 1));
997                         }
998                         msg_ctx->num_waiters -= 1;
999                         continue;
1000                 }
1001
1002                 state = tevent_req_data(
1003                         req, struct messaging_filtered_read_state);
1004                 if (state->filter(rec, state->private_data)) {
1005                         messaging_filtered_read_done(req, rec);
1006
1007                         /*
1008                          * Only the first one gets the fd-array
1009                          */
1010                         rec->num_fds = 0;
1011                         rec->fds = NULL;
1012                 }
1013
1014                 i += 1;
1015         }
1016
1017         /*
1018          * If the fd-array isn't used, just close it.
1019          */
1020         for (j=0; j < rec->num_fds; j++) {
1021                 int fd = rec->fds[j];
1022                 close(fd);
1023         }
1024         rec->num_fds = 0;
1025         rec->fds = NULL;
1026 }
1027
1028 static int mess_parent_dgm_cleanup(void *private_data);
1029 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1030
1031 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1032 {
1033         struct tevent_req *req;
1034
1035         req = background_job_send(
1036                 msg, msg->event_ctx, msg, NULL, 0,
1037                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1038                             60*15),
1039                 mess_parent_dgm_cleanup, msg);
1040         if (req == NULL) {
1041                 return false;
1042         }
1043         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1044         return true;
1045 }
1046
1047 static int mess_parent_dgm_cleanup(void *private_data)
1048 {
1049         int ret;
1050
1051         ret = messaging_dgm_wipe();
1052         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1053                    ret ? strerror(ret) : "ok"));
1054         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1055                            60*15);
1056 }
1057
1058 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1059 {
1060         struct messaging_context *msg = tevent_req_callback_data(
1061                 req, struct messaging_context);
1062         NTSTATUS status;
1063
1064         status = background_job_recv(req);
1065         TALLOC_FREE(req);
1066         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1067                   nt_errstr(status)));
1068
1069         req = background_job_send(
1070                 msg, msg->event_ctx, msg, NULL, 0,
1071                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1072                             60*15),
1073                 mess_parent_dgm_cleanup, msg);
1074         if (req == NULL) {
1075                 DEBUG(1, ("background_job_send failed\n"));
1076                 return;
1077         }
1078         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1079 }
1080
1081 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1082 {
1083         int ret;
1084
1085         if (pid == 0) {
1086                 ret = messaging_dgm_wipe();
1087         } else {
1088                 ret = messaging_dgm_cleanup(pid);
1089         }
1090
1091         return ret;
1092 }
1093
1094 struct tevent_context *messaging_tevent_context(
1095         struct messaging_context *msg_ctx)
1096 {
1097         return msg_ctx->event_ctx;
1098 }
1099
1100 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1101 {
1102         return msg_ctx->names_db;
1103 }
1104
1105 /** @} **/