s3:lib/messages: fix error check in messaging_filtered_read_send()
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32_t msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         struct server_id_buf idbuf;
217         struct messaging_rec rec;
218         int64_t fds64[MIN(num_fds, INT8_MAX)];
219         size_t i;
220
221         if (msg_len < MESSAGE_HDR_LENGTH) {
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 goto close_fail;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228                 goto close_fail;
229         }
230
231         /*
232          * "consume" the fds by copying them and setting
233          * the original variable to -1
234          */
235         for (i=0; i < num_fds; i++) {
236                 fds64[i] = fds[i];
237                 fds[i] = -1;
238         }
239
240         rec = (struct messaging_rec) {
241                 .msg_version = MESSAGE_VERSION,
242                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244                 .num_fds = num_fds,
245                 .fds = fds64,
246         };
247
248         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
249
250         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251                    __func__, (unsigned)rec.msg_type,
252                    (unsigned)rec.buf.length,
253                    (unsigned)num_fds,
254                    server_id_str_buf(rec.src, &idbuf)));
255
256         messaging_dispatch_rec(msg_ctx, &rec);
257         return;
258
259 close_fail:
260         for (i=0; i < num_fds; i++) {
261                 close(fds[i]);
262         }
263 }
264
265 static int messaging_context_destructor(struct messaging_context *ctx)
266 {
267         unsigned i;
268
269         for (i=0; i<ctx->num_new_waiters; i++) {
270                 if (ctx->new_waiters[i] != NULL) {
271                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272                         ctx->new_waiters[i] = NULL;
273                 }
274         }
275         for (i=0; i<ctx->num_waiters; i++) {
276                 if (ctx->waiters[i] != NULL) {
277                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278                         ctx->waiters[i] = NULL;
279                 }
280         }
281
282         return 0;
283 }
284
285 static const char *private_path(const char *name)
286 {
287         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
288 }
289
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
291                                          struct tevent_context *ev)
292 {
293         struct messaging_context *ctx;
294         int ret;
295         const char *lck_path;
296         const char *priv_path;
297         bool ok;
298
299         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
300                 return NULL;
301         }
302
303         ctx->id = procid_self();
304         ctx->event_ctx = ev;
305
306         sec_init();
307
308         lck_path = lock_path("msg.lock");
309         if (lck_path == NULL) {
310                 TALLOC_FREE(ctx);
311                 return NULL;
312         }
313
314         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
315                                               0755);
316         if (!ok) {
317                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
318                            __func__, strerror(errno)));
319                 TALLOC_FREE(ctx);
320                 return NULL;
321         }
322
323         priv_path = private_path("msg.sock");
324         if (priv_path == NULL) {
325                 TALLOC_FREE(ctx);
326                 return NULL;
327         }
328
329         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
330                                               0700);
331         if (!ok) {
332                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
333                            __func__, strerror(errno)));
334                 TALLOC_FREE(ctx);
335                 return NULL;
336         }
337
338         ctx->msg_dgm_ref = messaging_dgm_ref(
339                 ctx, ctx->event_ctx, ctx->id.unique_id,
340                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
341
342         if (ctx->msg_dgm_ref == NULL) {
343                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
344                 TALLOC_FREE(ctx);
345                 return NULL;
346         }
347
348         talloc_set_destructor(ctx, messaging_context_destructor);
349
350         if (lp_clustering()) {
351                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
352
353                 if (ret != 0) {
354                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
355                                   strerror(ret)));
356                         TALLOC_FREE(ctx);
357                         return NULL;
358                 }
359         }
360         ctx->id.vnn = get_my_vnn();
361
362         ctx->names_db = server_id_db_init(
363                 ctx, ctx->id, lp_lock_directory(), 0,
364                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
365         if (ctx->names_db == NULL) {
366                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
367                 TALLOC_FREE(ctx);
368                 return NULL;
369         }
370
371         messaging_register(ctx, NULL, MSG_PING, ping_message);
372
373         /* Register some debugging related messages */
374
375         register_msg_pool_usage(ctx);
376         register_dmalloc_msgs(ctx);
377         debug_register_msgs(ctx);
378
379         return ctx;
380 }
381
382 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
383 {
384         return msg_ctx->id;
385 }
386
387 /*
388  * re-init after a fork
389  */
390 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
391 {
392         int ret;
393
394         TALLOC_FREE(msg_ctx->msg_dgm_ref);
395
396         msg_ctx->id = procid_self();
397
398         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
399                 msg_ctx, msg_ctx->event_ctx, msg_ctx->id.unique_id,
400                 private_path("msg.sock"), lock_path("msg.lock"),
401                 messaging_recv_cb, msg_ctx, &ret);
402
403         if (msg_ctx->msg_dgm_ref == NULL) {
404                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
405                 return map_nt_error_from_unix(ret);
406         }
407
408         TALLOC_FREE(msg_ctx->remote);
409
410         if (lp_clustering()) {
411                 ret = messaging_ctdbd_init(msg_ctx, msg_ctx,
412                                            &msg_ctx->remote);
413
414                 if (ret != 0) {
415                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
416                                   strerror(ret)));
417                         return map_nt_error_from_unix(ret);
418                 }
419         }
420
421         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
422
423         return NT_STATUS_OK;
424 }
425
426
427 /*
428  * Register a dispatch function for a particular message type. Allow multiple
429  * registrants
430 */
431 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
432                             void *private_data,
433                             uint32_t msg_type,
434                             void (*fn)(struct messaging_context *msg,
435                                        void *private_data, 
436                                        uint32_t msg_type, 
437                                        struct server_id server_id,
438                                        DATA_BLOB *data))
439 {
440         struct messaging_callback *cb;
441
442         DEBUG(5, ("Registering messaging pointer for type %u - "
443                   "private_data=%p\n",
444                   (unsigned)msg_type, private_data));
445
446         /*
447          * Only one callback per type
448          */
449
450         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
451                 /* we allow a second registration of the same message
452                    type if it has a different private pointer. This is
453                    needed in, for example, the internal notify code,
454                    which creates a new notify context for each tree
455                    connect, and expects to receive messages to each of
456                    them. */
457                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
458                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
459                                   (unsigned)msg_type, private_data));
460                         cb->fn = fn;
461                         cb->private_data = private_data;
462                         return NT_STATUS_OK;
463                 }
464         }
465
466         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
467                 return NT_STATUS_NO_MEMORY;
468         }
469
470         cb->msg_type = msg_type;
471         cb->fn = fn;
472         cb->private_data = private_data;
473
474         DLIST_ADD(msg_ctx->callbacks, cb);
475         return NT_STATUS_OK;
476 }
477
478 /*
479   De-register the function for a particular message type.
480 */
481 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
482                           void *private_data)
483 {
484         struct messaging_callback *cb, *next;
485
486         for (cb = ctx->callbacks; cb; cb = next) {
487                 next = cb->next;
488                 if ((cb->msg_type == msg_type)
489                     && (cb->private_data == private_data)) {
490                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
491                                   (unsigned)msg_type, private_data));
492                         DLIST_REMOVE(ctx->callbacks, cb);
493                         TALLOC_FREE(cb);
494                 }
495         }
496 }
497
498 /*
499   Send a message to a particular server
500 */
501 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
502                         struct server_id server, uint32_t msg_type,
503                         const DATA_BLOB *data)
504 {
505         struct iovec iov;
506
507         iov.iov_base = data->data;
508         iov.iov_len = data->length;
509
510         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
511 }
512
513 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
514                             struct server_id server, uint32_t msg_type,
515                             const uint8_t *buf, size_t len)
516 {
517         DATA_BLOB blob = data_blob_const(buf, len);
518         return messaging_send(msg_ctx, server, msg_type, &blob);
519 }
520
521 int messaging_send_iov_from(struct messaging_context *msg_ctx,
522                             struct server_id src, struct server_id dst,
523                             uint32_t msg_type,
524                             const struct iovec *iov, int iovlen,
525                             const int *fds, size_t num_fds)
526 {
527         int ret;
528         uint8_t hdr[MESSAGE_HDR_LENGTH];
529         struct iovec iov2[iovlen+1];
530
531         if (server_id_is_disconnected(&dst)) {
532                 return EINVAL;
533         }
534
535         if (num_fds > INT8_MAX) {
536                 return EINVAL;
537         }
538
539         if (!procid_is_local(&dst)) {
540                 if (num_fds > 0) {
541                         return ENOSYS;
542                 }
543
544                 ret = msg_ctx->remote->send_fn(src, dst,
545                                                msg_type, iov, iovlen,
546                                                NULL, 0,
547                                                msg_ctx->remote);
548                 return ret;
549         }
550
551         message_hdr_put(hdr, msg_type, src, dst);
552         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
553         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
554
555         become_root();
556         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
557         unbecome_root();
558
559         return ret;
560 }
561
562 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
563                             struct server_id dst, uint32_t msg_type,
564                             const struct iovec *iov, int iovlen,
565                             const int *fds, size_t num_fds)
566 {
567         int ret;
568
569         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
570                                       iov, iovlen, fds, num_fds);
571         if (ret != 0) {
572                 return map_nt_error_from_unix(ret);
573         }
574         return NT_STATUS_OK;
575 }
576
577 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
578                                                struct messaging_rec *rec)
579 {
580         struct messaging_rec *result;
581         size_t fds_size = sizeof(int64_t) * rec->num_fds;
582
583         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
584                                       rec->buf.length + fds_size);
585         if (result == NULL) {
586                 return NULL;
587         }
588         *result = *rec;
589
590         /* Doesn't fail, see talloc_pooled_object */
591
592         result->buf.data = talloc_memdup(result, rec->buf.data,
593                                          rec->buf.length);
594
595         result->fds = NULL;
596         if (result->num_fds > 0) {
597                 result->fds = talloc_memdup(result, rec->fds, fds_size);
598         }
599
600         return result;
601 }
602
603 struct messaging_filtered_read_state {
604         struct tevent_context *ev;
605         struct messaging_context *msg_ctx;
606         void *tevent_handle;
607
608         bool (*filter)(struct messaging_rec *rec, void *private_data);
609         void *private_data;
610
611         struct messaging_rec *rec;
612 };
613
614 static void messaging_filtered_read_cleanup(struct tevent_req *req,
615                                             enum tevent_req_state req_state);
616
617 struct tevent_req *messaging_filtered_read_send(
618         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
619         struct messaging_context *msg_ctx,
620         bool (*filter)(struct messaging_rec *rec, void *private_data),
621         void *private_data)
622 {
623         struct tevent_req *req;
624         struct messaging_filtered_read_state *state;
625         size_t new_waiters_len;
626
627         req = tevent_req_create(mem_ctx, &state,
628                                 struct messaging_filtered_read_state);
629         if (req == NULL) {
630                 return NULL;
631         }
632         state->ev = ev;
633         state->msg_ctx = msg_ctx;
634         state->filter = filter;
635         state->private_data = private_data;
636
637         /*
638          * We have to defer the callback here, as we might be called from
639          * within a different tevent_context than state->ev
640          */
641         tevent_req_defer_callback(req, state->ev);
642
643         state->tevent_handle = messaging_dgm_register_tevent_context(
644                 state, ev);
645         if (tevent_req_nomem(state->tevent_handle, req)) {
646                 return tevent_req_post(req, ev);
647         }
648
649         /*
650          * We add ourselves to the "new_waiters" array, not the "waiters"
651          * array. If we are called from within messaging_read_done,
652          * messaging_dispatch_rec will be in an active for-loop on
653          * "waiters". We must be careful not to mess with this array, because
654          * it could mean that a single event is being delivered twice.
655          */
656
657         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
658
659         if (new_waiters_len == msg_ctx->num_new_waiters) {
660                 struct tevent_req **tmp;
661
662                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
663                                      struct tevent_req *, new_waiters_len+1);
664                 if (tevent_req_nomem(tmp, req)) {
665                         return tevent_req_post(req, ev);
666                 }
667                 msg_ctx->new_waiters = tmp;
668         }
669
670         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
671         msg_ctx->num_new_waiters += 1;
672         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
673
674         return req;
675 }
676
677 static void messaging_filtered_read_cleanup(struct tevent_req *req,
678                                             enum tevent_req_state req_state)
679 {
680         struct messaging_filtered_read_state *state = tevent_req_data(
681                 req, struct messaging_filtered_read_state);
682         struct messaging_context *msg_ctx = state->msg_ctx;
683         unsigned i;
684
685         tevent_req_set_cleanup_fn(req, NULL);
686
687         TALLOC_FREE(state->tevent_handle);
688
689         /*
690          * Just set the [new_]waiters entry to NULL, be careful not to mess
691          * with the other "waiters" array contents. We are often called from
692          * within "messaging_dispatch_rec", which loops over
693          * "waiters". Messing with the "waiters" array will mess up that
694          * for-loop.
695          */
696
697         for (i=0; i<msg_ctx->num_waiters; i++) {
698                 if (msg_ctx->waiters[i] == req) {
699                         msg_ctx->waiters[i] = NULL;
700                         return;
701                 }
702         }
703
704         for (i=0; i<msg_ctx->num_new_waiters; i++) {
705                 if (msg_ctx->new_waiters[i] == req) {
706                         msg_ctx->new_waiters[i] = NULL;
707                         return;
708                 }
709         }
710 }
711
712 static void messaging_filtered_read_done(struct tevent_req *req,
713                                          struct messaging_rec *rec)
714 {
715         struct messaging_filtered_read_state *state = tevent_req_data(
716                 req, struct messaging_filtered_read_state);
717
718         state->rec = messaging_rec_dup(state, rec);
719         if (tevent_req_nomem(state->rec, req)) {
720                 return;
721         }
722         tevent_req_done(req);
723 }
724
725 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
726                                  struct messaging_rec **presult)
727 {
728         struct messaging_filtered_read_state *state = tevent_req_data(
729                 req, struct messaging_filtered_read_state);
730         int err;
731
732         if (tevent_req_is_unix_error(req, &err)) {
733                 tevent_req_received(req);
734                 return err;
735         }
736         *presult = talloc_move(mem_ctx, &state->rec);
737         return 0;
738 }
739
740 struct messaging_read_state {
741         uint32_t msg_type;
742         struct messaging_rec *rec;
743 };
744
745 static bool messaging_read_filter(struct messaging_rec *rec,
746                                   void *private_data);
747 static void messaging_read_done(struct tevent_req *subreq);
748
749 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
750                                        struct tevent_context *ev,
751                                        struct messaging_context *msg,
752                                        uint32_t msg_type)
753 {
754         struct tevent_req *req, *subreq;
755         struct messaging_read_state *state;
756
757         req = tevent_req_create(mem_ctx, &state,
758                                 struct messaging_read_state);
759         if (req == NULL) {
760                 return NULL;
761         }
762         state->msg_type = msg_type;
763
764         subreq = messaging_filtered_read_send(state, ev, msg,
765                                               messaging_read_filter, state);
766         if (tevent_req_nomem(subreq, req)) {
767                 return tevent_req_post(req, ev);
768         }
769         tevent_req_set_callback(subreq, messaging_read_done, req);
770         return req;
771 }
772
773 static bool messaging_read_filter(struct messaging_rec *rec,
774                                   void *private_data)
775 {
776         struct messaging_read_state *state = talloc_get_type_abort(
777                 private_data, struct messaging_read_state);
778
779         if (rec->num_fds != 0) {
780                 return false;
781         }
782
783         return rec->msg_type == state->msg_type;
784 }
785
786 static void messaging_read_done(struct tevent_req *subreq)
787 {
788         struct tevent_req *req = tevent_req_callback_data(
789                 subreq, struct tevent_req);
790         struct messaging_read_state *state = tevent_req_data(
791                 req, struct messaging_read_state);
792         int ret;
793
794         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
795         TALLOC_FREE(subreq);
796         if (tevent_req_error(req, ret)) {
797                 return;
798         }
799         tevent_req_done(req);
800 }
801
802 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
803                         struct messaging_rec **presult)
804 {
805         struct messaging_read_state *state = tevent_req_data(
806                 req, struct messaging_read_state);
807         int err;
808
809         if (tevent_req_is_unix_error(req, &err)) {
810                 return err;
811         }
812         if (presult != NULL) {
813                 *presult = talloc_move(mem_ctx, &state->rec);
814         }
815         return 0;
816 }
817
818 struct messaging_handler_state {
819         struct tevent_context *ev;
820         struct messaging_context *msg_ctx;
821         uint32_t msg_type;
822         bool (*handler)(struct messaging_context *msg_ctx,
823                         struct messaging_rec **rec, void *private_data);
824         void *private_data;
825 };
826
827 static void messaging_handler_got_msg(struct tevent_req *subreq);
828
829 struct tevent_req *messaging_handler_send(
830         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
831         struct messaging_context *msg_ctx, uint32_t msg_type,
832         bool (*handler)(struct messaging_context *msg_ctx,
833                         struct messaging_rec **rec, void *private_data),
834         void *private_data)
835 {
836         struct tevent_req *req, *subreq;
837         struct messaging_handler_state *state;
838
839         req = tevent_req_create(mem_ctx, &state,
840                                 struct messaging_handler_state);
841         if (req == NULL) {
842                 return NULL;
843         }
844         state->ev = ev;
845         state->msg_ctx = msg_ctx;
846         state->msg_type = msg_type;
847         state->handler = handler;
848         state->private_data = private_data;
849
850         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
851                                      state->msg_type);
852         if (tevent_req_nomem(subreq, req)) {
853                 return tevent_req_post(req, ev);
854         }
855         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
856         return req;
857 }
858
859 static void messaging_handler_got_msg(struct tevent_req *subreq)
860 {
861         struct tevent_req *req = tevent_req_callback_data(
862                 subreq, struct tevent_req);
863         struct messaging_handler_state *state = tevent_req_data(
864                 req, struct messaging_handler_state);
865         struct messaging_rec *rec;
866         int ret;
867         bool ok;
868
869         ret = messaging_read_recv(subreq, state, &rec);
870         TALLOC_FREE(subreq);
871         if (tevent_req_error(req, ret)) {
872                 return;
873         }
874
875         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
876                                      state->msg_type);
877         if (tevent_req_nomem(subreq, req)) {
878                 return;
879         }
880         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
881
882         ok = state->handler(state->msg_ctx, &rec, state->private_data);
883         TALLOC_FREE(rec);
884         if (ok) {
885                 /*
886                  * Next round
887                  */
888                 return;
889         }
890         TALLOC_FREE(subreq);
891         tevent_req_done(req);
892 }
893
894 int messaging_handler_recv(struct tevent_req *req)
895 {
896         return tevent_req_simple_recv_unix(req);
897 }
898
899 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
900 {
901         if (msg_ctx->num_new_waiters == 0) {
902                 return true;
903         }
904
905         if (talloc_array_length(msg_ctx->waiters) <
906             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
907                 struct tevent_req **tmp;
908                 tmp = talloc_realloc(
909                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
910                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
911                 if (tmp == NULL) {
912                         DEBUG(1, ("%s: talloc failed\n", __func__));
913                         return false;
914                 }
915                 msg_ctx->waiters = tmp;
916         }
917
918         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
919                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
920
921         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
922         msg_ctx->num_new_waiters = 0;
923
924         return true;
925 }
926
927 /*
928   Dispatch one messaging_rec
929 */
930 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
931                                    struct messaging_rec *rec)
932 {
933         struct messaging_callback *cb, *next;
934         unsigned i;
935         size_t j;
936
937         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
938                 next = cb->next;
939                 if (cb->msg_type != rec->msg_type) {
940                         continue;
941                 }
942
943                 /*
944                  * the old style callbacks don't support fd passing
945                  */
946                 for (j=0; j < rec->num_fds; j++) {
947                         int fd = rec->fds[j];
948                         close(fd);
949                 }
950                 rec->num_fds = 0;
951                 rec->fds = NULL;
952
953                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
954                        rec->src, &rec->buf);
955
956                 /*
957                  * we continue looking for matching messages after finding
958                  * one. This matters for subsystems like the internal notify
959                  * code which register more than one handler for the same
960                  * message type
961                  */
962         }
963
964         if (!messaging_append_new_waiters(msg_ctx)) {
965                 for (j=0; j < rec->num_fds; j++) {
966                         int fd = rec->fds[j];
967                         close(fd);
968                 }
969                 rec->num_fds = 0;
970                 rec->fds = NULL;
971                 return;
972         }
973
974         i = 0;
975         while (i < msg_ctx->num_waiters) {
976                 struct tevent_req *req;
977                 struct messaging_filtered_read_state *state;
978
979                 req = msg_ctx->waiters[i];
980                 if (req == NULL) {
981                         /*
982                          * This got cleaned up. In the meantime,
983                          * move everything down one. We need
984                          * to keep the order of waiters, as
985                          * other code may depend on this.
986                          */
987                         if (i < msg_ctx->num_waiters - 1) {
988                                 memmove(&msg_ctx->waiters[i],
989                                         &msg_ctx->waiters[i+1],
990                                         sizeof(struct tevent_req *) *
991                                             (msg_ctx->num_waiters - i - 1));
992                         }
993                         msg_ctx->num_waiters -= 1;
994                         continue;
995                 }
996
997                 state = tevent_req_data(
998                         req, struct messaging_filtered_read_state);
999                 if (state->filter(rec, state->private_data)) {
1000                         messaging_filtered_read_done(req, rec);
1001
1002                         /*
1003                          * Only the first one gets the fd-array
1004                          */
1005                         rec->num_fds = 0;
1006                         rec->fds = NULL;
1007                 }
1008
1009                 i += 1;
1010         }
1011
1012         /*
1013          * If the fd-array isn't used, just close it.
1014          */
1015         for (j=0; j < rec->num_fds; j++) {
1016                 int fd = rec->fds[j];
1017                 close(fd);
1018         }
1019         rec->num_fds = 0;
1020         rec->fds = NULL;
1021 }
1022
1023 static int mess_parent_dgm_cleanup(void *private_data);
1024 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1025
1026 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1027 {
1028         struct tevent_req *req;
1029
1030         req = background_job_send(
1031                 msg, msg->event_ctx, msg, NULL, 0,
1032                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1033                             60*15),
1034                 mess_parent_dgm_cleanup, msg);
1035         if (req == NULL) {
1036                 return false;
1037         }
1038         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1039         return true;
1040 }
1041
1042 static int mess_parent_dgm_cleanup(void *private_data)
1043 {
1044         int ret;
1045
1046         ret = messaging_dgm_wipe();
1047         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1048                    ret ? strerror(ret) : "ok"));
1049         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1050                            60*15);
1051 }
1052
1053 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1054 {
1055         struct messaging_context *msg = tevent_req_callback_data(
1056                 req, struct messaging_context);
1057         NTSTATUS status;
1058
1059         status = background_job_recv(req);
1060         TALLOC_FREE(req);
1061         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1062                   nt_errstr(status)));
1063
1064         req = background_job_send(
1065                 msg, msg->event_ctx, msg, NULL, 0,
1066                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1067                             60*15),
1068                 mess_parent_dgm_cleanup, msg);
1069         if (req == NULL) {
1070                 DEBUG(1, ("background_job_send failed\n"));
1071                 return;
1072         }
1073         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1074 }
1075
1076 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1077 {
1078         int ret;
1079
1080         if (pid == 0) {
1081                 ret = messaging_dgm_wipe();
1082         } else {
1083                 ret = messaging_dgm_cleanup(pid);
1084         }
1085
1086         return ret;
1087 }
1088
1089 struct tevent_context *messaging_tevent_context(
1090         struct messaging_context *msg_ctx)
1091 {
1092         return msg_ctx->event_ctx;
1093 }
1094
1095 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1096 {
1097         return msg_ctx->names_db;
1098 }
1099
1100 /** @} **/