lib: Fix CID 1272858 Copy-paste error
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32_t msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         struct server_id_buf idbuf;
217         struct messaging_rec rec;
218         int64_t fds64[MIN(num_fds, INT8_MAX)];
219         size_t i;
220
221         if (msg_len < MESSAGE_HDR_LENGTH) {
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 goto close_fail;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228                 goto close_fail;
229         }
230
231         /*
232          * "consume" the fds by copying them and setting
233          * the original variable to -1
234          */
235         for (i=0; i < num_fds; i++) {
236                 fds64[i] = fds[i];
237                 fds[i] = -1;
238         }
239
240         rec = (struct messaging_rec) {
241                 .msg_version = MESSAGE_VERSION,
242                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244                 .num_fds = num_fds,
245                 .fds = fds64,
246         };
247
248         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
249
250         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251                    __func__, (unsigned)rec.msg_type,
252                    (unsigned)rec.buf.length,
253                    (unsigned)num_fds,
254                    server_id_str_buf(rec.src, &idbuf)));
255
256         messaging_dispatch_rec(msg_ctx, &rec);
257         return;
258
259 close_fail:
260         for (i=0; i < num_fds; i++) {
261                 close(fds[i]);
262         }
263 }
264
265 static int messaging_context_destructor(struct messaging_context *ctx)
266 {
267         unsigned i;
268
269         for (i=0; i<ctx->num_new_waiters; i++) {
270                 if (ctx->new_waiters[i] != NULL) {
271                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272                         ctx->new_waiters[i] = NULL;
273                 }
274         }
275         for (i=0; i<ctx->num_waiters; i++) {
276                 if (ctx->waiters[i] != NULL) {
277                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278                         ctx->waiters[i] = NULL;
279                 }
280         }
281
282         return 0;
283 }
284
285 static const char *private_path(const char *name)
286 {
287         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
288 }
289
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
291                                          struct tevent_context *ev)
292 {
293         struct messaging_context *ctx;
294         NTSTATUS status;
295         int ret;
296         const char *lck_path;
297         const char *priv_path;
298         bool ok;
299
300         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
301                 return NULL;
302         }
303
304         ctx->id = procid_self();
305         ctx->event_ctx = ev;
306
307         sec_init();
308
309         lck_path = lock_path("msg");
310         if (lck_path == NULL) {
311                 TALLOC_FREE(ctx);
312                 return NULL;
313         }
314
315         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
316                                               0755);
317         if (!ok) {
318                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
319                            __func__, strerror(errno)));
320                 TALLOC_FREE(ctx);
321                 return NULL;
322         }
323
324         priv_path = private_path("sock");
325
326         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
327                                               0700);
328         if (!ok) {
329                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
330                            __func__, strerror(errno)));
331                 TALLOC_FREE(ctx);
332                 return NULL;
333         }
334
335         ctx->msg_dgm_ref = messaging_dgm_ref(
336                 ctx, ctx->event_ctx, ctx->id.unique_id,
337                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
338
339         if (ctx->msg_dgm_ref == NULL) {
340                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
341                 TALLOC_FREE(ctx);
342                 return NULL;
343         }
344
345         talloc_set_destructor(ctx, messaging_context_destructor);
346
347         if (lp_clustering()) {
348                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
349
350                 if (!NT_STATUS_IS_OK(status)) {
351                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
352                                   nt_errstr(status)));
353                         TALLOC_FREE(ctx);
354                         return NULL;
355                 }
356         }
357         ctx->id.vnn = get_my_vnn();
358
359         ctx->names_db = server_id_db_init(
360                 ctx, ctx->id, lp_lock_directory(), 0,
361                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
362         if (ctx->names_db == NULL) {
363                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
364                 TALLOC_FREE(ctx);
365                 return NULL;
366         }
367
368         messaging_register(ctx, NULL, MSG_PING, ping_message);
369
370         /* Register some debugging related messages */
371
372         register_msg_pool_usage(ctx);
373         register_dmalloc_msgs(ctx);
374         debug_register_msgs(ctx);
375
376         return ctx;
377 }
378
379 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
380 {
381         return msg_ctx->id;
382 }
383
384 /*
385  * re-init after a fork
386  */
387 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
388 {
389         NTSTATUS status;
390         int ret;
391
392         TALLOC_FREE(msg_ctx->msg_dgm_ref);
393
394         msg_ctx->id = procid_self();
395
396         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
397                 msg_ctx, msg_ctx->event_ctx, msg_ctx->id.unique_id,
398                 private_path("sock"), lock_path("msg"),
399                 messaging_recv_cb, msg_ctx, &ret);
400
401         if (msg_ctx->msg_dgm_ref == NULL) {
402                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
403                 return map_nt_error_from_unix(ret);
404         }
405
406         TALLOC_FREE(msg_ctx->remote);
407
408         if (lp_clustering()) {
409                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
410                                               &msg_ctx->remote);
411
412                 if (!NT_STATUS_IS_OK(status)) {
413                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
414                                   nt_errstr(status)));
415                         return status;
416                 }
417         }
418
419         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
420
421         return NT_STATUS_OK;
422 }
423
424
425 /*
426  * Register a dispatch function for a particular message type. Allow multiple
427  * registrants
428 */
429 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
430                             void *private_data,
431                             uint32_t msg_type,
432                             void (*fn)(struct messaging_context *msg,
433                                        void *private_data, 
434                                        uint32_t msg_type, 
435                                        struct server_id server_id,
436                                        DATA_BLOB *data))
437 {
438         struct messaging_callback *cb;
439
440         DEBUG(5, ("Registering messaging pointer for type %u - "
441                   "private_data=%p\n",
442                   (unsigned)msg_type, private_data));
443
444         /*
445          * Only one callback per type
446          */
447
448         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
449                 /* we allow a second registration of the same message
450                    type if it has a different private pointer. This is
451                    needed in, for example, the internal notify code,
452                    which creates a new notify context for each tree
453                    connect, and expects to receive messages to each of
454                    them. */
455                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
456                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
457                                   (unsigned)msg_type, private_data));
458                         cb->fn = fn;
459                         cb->private_data = private_data;
460                         return NT_STATUS_OK;
461                 }
462         }
463
464         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
465                 return NT_STATUS_NO_MEMORY;
466         }
467
468         cb->msg_type = msg_type;
469         cb->fn = fn;
470         cb->private_data = private_data;
471
472         DLIST_ADD(msg_ctx->callbacks, cb);
473         return NT_STATUS_OK;
474 }
475
476 /*
477   De-register the function for a particular message type.
478 */
479 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
480                           void *private_data)
481 {
482         struct messaging_callback *cb, *next;
483
484         for (cb = ctx->callbacks; cb; cb = next) {
485                 next = cb->next;
486                 if ((cb->msg_type == msg_type)
487                     && (cb->private_data == private_data)) {
488                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
489                                   (unsigned)msg_type, private_data));
490                         DLIST_REMOVE(ctx->callbacks, cb);
491                         TALLOC_FREE(cb);
492                 }
493         }
494 }
495
496 /*
497   Send a message to a particular server
498 */
499 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
500                         struct server_id server, uint32_t msg_type,
501                         const DATA_BLOB *data)
502 {
503         struct iovec iov;
504
505         iov.iov_base = data->data;
506         iov.iov_len = data->length;
507
508         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
509 }
510
511 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
512                             struct server_id server, uint32_t msg_type,
513                             const uint8_t *buf, size_t len)
514 {
515         DATA_BLOB blob = data_blob_const(buf, len);
516         return messaging_send(msg_ctx, server, msg_type, &blob);
517 }
518
519 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
520                                  struct server_id src, struct server_id dst,
521                                  uint32_t msg_type,
522                                  const struct iovec *iov, int iovlen,
523                                  const int *fds, size_t num_fds)
524 {
525         int ret;
526         uint8_t hdr[MESSAGE_HDR_LENGTH];
527         struct iovec iov2[iovlen+1];
528
529         if (server_id_is_disconnected(&dst)) {
530                 return NT_STATUS_INVALID_PARAMETER_MIX;
531         }
532
533         if (num_fds > INT8_MAX) {
534                 return NT_STATUS_INVALID_PARAMETER_MIX;
535         }
536
537         if (!procid_is_local(&dst)) {
538                 if (num_fds > 0) {
539                         return NT_STATUS_NOT_SUPPORTED;
540                 }
541
542                 ret = msg_ctx->remote->send_fn(src, dst,
543                                                msg_type, iov, iovlen,
544                                                NULL, 0,
545                                                msg_ctx->remote);
546                 if (ret != 0) {
547                         return map_nt_error_from_unix(ret);
548                 }
549                 return NT_STATUS_OK;
550         }
551
552         message_hdr_put(hdr, msg_type, src, dst);
553         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
554         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
555
556         become_root();
557         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
558         unbecome_root();
559
560         if (ret != 0) {
561                 return map_nt_error_from_unix(ret);
562         }
563         return NT_STATUS_OK;
564 }
565
566 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
567                             struct server_id dst, uint32_t msg_type,
568                             const struct iovec *iov, int iovlen,
569                             const int *fds, size_t num_fds)
570 {
571         return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
572                                        iov, iovlen, fds, num_fds);
573 }
574
575 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
576                                                struct messaging_rec *rec)
577 {
578         struct messaging_rec *result;
579         size_t fds_size = sizeof(int64_t) * rec->num_fds;
580
581         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
582                                       rec->buf.length + fds_size);
583         if (result == NULL) {
584                 return NULL;
585         }
586         *result = *rec;
587
588         /* Doesn't fail, see talloc_pooled_object */
589
590         result->buf.data = talloc_memdup(result, rec->buf.data,
591                                          rec->buf.length);
592
593         result->fds = NULL;
594         if (result->num_fds > 0) {
595                 result->fds = talloc_memdup(result, rec->fds, fds_size);
596         }
597
598         return result;
599 }
600
601 struct messaging_filtered_read_state {
602         struct tevent_context *ev;
603         struct messaging_context *msg_ctx;
604         void *tevent_handle;
605
606         bool (*filter)(struct messaging_rec *rec, void *private_data);
607         void *private_data;
608
609         struct messaging_rec *rec;
610 };
611
612 static void messaging_filtered_read_cleanup(struct tevent_req *req,
613                                             enum tevent_req_state req_state);
614
615 struct tevent_req *messaging_filtered_read_send(
616         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
617         struct messaging_context *msg_ctx,
618         bool (*filter)(struct messaging_rec *rec, void *private_data),
619         void *private_data)
620 {
621         struct tevent_req *req;
622         struct messaging_filtered_read_state *state;
623         size_t new_waiters_len;
624
625         req = tevent_req_create(mem_ctx, &state,
626                                 struct messaging_filtered_read_state);
627         if (req == NULL) {
628                 return NULL;
629         }
630         state->ev = ev;
631         state->msg_ctx = msg_ctx;
632         state->filter = filter;
633         state->private_data = private_data;
634
635         /*
636          * We have to defer the callback here, as we might be called from
637          * within a different tevent_context than state->ev
638          */
639         tevent_req_defer_callback(req, state->ev);
640
641         state->tevent_handle = messaging_dgm_register_tevent_context(
642                 state, ev);
643         if (tevent_req_nomem(state, req)) {
644                 return tevent_req_post(req, ev);
645         }
646
647         /*
648          * We add ourselves to the "new_waiters" array, not the "waiters"
649          * array. If we are called from within messaging_read_done,
650          * messaging_dispatch_rec will be in an active for-loop on
651          * "waiters". We must be careful not to mess with this array, because
652          * it could mean that a single event is being delivered twice.
653          */
654
655         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
656
657         if (new_waiters_len == msg_ctx->num_new_waiters) {
658                 struct tevent_req **tmp;
659
660                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
661                                      struct tevent_req *, new_waiters_len+1);
662                 if (tevent_req_nomem(tmp, req)) {
663                         return tevent_req_post(req, ev);
664                 }
665                 msg_ctx->new_waiters = tmp;
666         }
667
668         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
669         msg_ctx->num_new_waiters += 1;
670         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
671
672         return req;
673 }
674
675 static void messaging_filtered_read_cleanup(struct tevent_req *req,
676                                             enum tevent_req_state req_state)
677 {
678         struct messaging_filtered_read_state *state = tevent_req_data(
679                 req, struct messaging_filtered_read_state);
680         struct messaging_context *msg_ctx = state->msg_ctx;
681         unsigned i;
682
683         tevent_req_set_cleanup_fn(req, NULL);
684
685         TALLOC_FREE(state->tevent_handle);
686
687         /*
688          * Just set the [new_]waiters entry to NULL, be careful not to mess
689          * with the other "waiters" array contents. We are often called from
690          * within "messaging_dispatch_rec", which loops over
691          * "waiters". Messing with the "waiters" array will mess up that
692          * for-loop.
693          */
694
695         for (i=0; i<msg_ctx->num_waiters; i++) {
696                 if (msg_ctx->waiters[i] == req) {
697                         msg_ctx->waiters[i] = NULL;
698                         return;
699                 }
700         }
701
702         for (i=0; i<msg_ctx->num_new_waiters; i++) {
703                 if (msg_ctx->new_waiters[i] == req) {
704                         msg_ctx->new_waiters[i] = NULL;
705                         return;
706                 }
707         }
708 }
709
710 static void messaging_filtered_read_done(struct tevent_req *req,
711                                          struct messaging_rec *rec)
712 {
713         struct messaging_filtered_read_state *state = tevent_req_data(
714                 req, struct messaging_filtered_read_state);
715
716         state->rec = messaging_rec_dup(state, rec);
717         if (tevent_req_nomem(state->rec, req)) {
718                 return;
719         }
720         tevent_req_done(req);
721 }
722
723 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
724                                  struct messaging_rec **presult)
725 {
726         struct messaging_filtered_read_state *state = tevent_req_data(
727                 req, struct messaging_filtered_read_state);
728         int err;
729
730         if (tevent_req_is_unix_error(req, &err)) {
731                 tevent_req_received(req);
732                 return err;
733         }
734         *presult = talloc_move(mem_ctx, &state->rec);
735         return 0;
736 }
737
738 struct messaging_read_state {
739         uint32_t msg_type;
740         struct messaging_rec *rec;
741 };
742
743 static bool messaging_read_filter(struct messaging_rec *rec,
744                                   void *private_data);
745 static void messaging_read_done(struct tevent_req *subreq);
746
747 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
748                                        struct tevent_context *ev,
749                                        struct messaging_context *msg,
750                                        uint32_t msg_type)
751 {
752         struct tevent_req *req, *subreq;
753         struct messaging_read_state *state;
754
755         req = tevent_req_create(mem_ctx, &state,
756                                 struct messaging_read_state);
757         if (req == NULL) {
758                 return NULL;
759         }
760         state->msg_type = msg_type;
761
762         subreq = messaging_filtered_read_send(state, ev, msg,
763                                               messaging_read_filter, state);
764         if (tevent_req_nomem(subreq, req)) {
765                 return tevent_req_post(req, ev);
766         }
767         tevent_req_set_callback(subreq, messaging_read_done, req);
768         return req;
769 }
770
771 static bool messaging_read_filter(struct messaging_rec *rec,
772                                   void *private_data)
773 {
774         struct messaging_read_state *state = talloc_get_type_abort(
775                 private_data, struct messaging_read_state);
776
777         if (rec->num_fds != 0) {
778                 return false;
779         }
780
781         return rec->msg_type == state->msg_type;
782 }
783
784 static void messaging_read_done(struct tevent_req *subreq)
785 {
786         struct tevent_req *req = tevent_req_callback_data(
787                 subreq, struct tevent_req);
788         struct messaging_read_state *state = tevent_req_data(
789                 req, struct messaging_read_state);
790         int ret;
791
792         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
793         TALLOC_FREE(subreq);
794         if (tevent_req_error(req, ret)) {
795                 return;
796         }
797         tevent_req_done(req);
798 }
799
800 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
801                         struct messaging_rec **presult)
802 {
803         struct messaging_read_state *state = tevent_req_data(
804                 req, struct messaging_read_state);
805         int err;
806
807         if (tevent_req_is_unix_error(req, &err)) {
808                 return err;
809         }
810         if (presult != NULL) {
811                 *presult = talloc_move(mem_ctx, &state->rec);
812         }
813         return 0;
814 }
815
816 struct messaging_handler_state {
817         struct tevent_context *ev;
818         struct messaging_context *msg_ctx;
819         uint32_t msg_type;
820         bool (*handler)(struct messaging_context *msg_ctx,
821                         struct messaging_rec **rec, void *private_data);
822         void *private_data;
823 };
824
825 static void messaging_handler_got_msg(struct tevent_req *subreq);
826
827 struct tevent_req *messaging_handler_send(
828         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
829         struct messaging_context *msg_ctx, uint32_t msg_type,
830         bool (*handler)(struct messaging_context *msg_ctx,
831                         struct messaging_rec **rec, void *private_data),
832         void *private_data)
833 {
834         struct tevent_req *req, *subreq;
835         struct messaging_handler_state *state;
836
837         req = tevent_req_create(mem_ctx, &state,
838                                 struct messaging_handler_state);
839         if (req == NULL) {
840                 return NULL;
841         }
842         state->ev = ev;
843         state->msg_ctx = msg_ctx;
844         state->msg_type = msg_type;
845         state->handler = handler;
846         state->private_data = private_data;
847
848         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
849                                      state->msg_type);
850         if (tevent_req_nomem(subreq, req)) {
851                 return tevent_req_post(req, ev);
852         }
853         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
854         return req;
855 }
856
857 static void messaging_handler_got_msg(struct tevent_req *subreq)
858 {
859         struct tevent_req *req = tevent_req_callback_data(
860                 subreq, struct tevent_req);
861         struct messaging_handler_state *state = tevent_req_data(
862                 req, struct messaging_handler_state);
863         struct messaging_rec *rec;
864         int ret;
865         bool ok;
866
867         ret = messaging_read_recv(subreq, state, &rec);
868         TALLOC_FREE(subreq);
869         if (tevent_req_error(req, ret)) {
870                 return;
871         }
872
873         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
874                                      state->msg_type);
875         if (tevent_req_nomem(subreq, req)) {
876                 return;
877         }
878         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
879
880         ok = state->handler(state->msg_ctx, &rec, state->private_data);
881         TALLOC_FREE(rec);
882         if (ok) {
883                 /*
884                  * Next round
885                  */
886                 return;
887         }
888         TALLOC_FREE(subreq);
889         tevent_req_done(req);
890 }
891
892 int messaging_handler_recv(struct tevent_req *req)
893 {
894         return tevent_req_simple_recv_unix(req);
895 }
896
897 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
898 {
899         if (msg_ctx->num_new_waiters == 0) {
900                 return true;
901         }
902
903         if (talloc_array_length(msg_ctx->waiters) <
904             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
905                 struct tevent_req **tmp;
906                 tmp = talloc_realloc(
907                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
908                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
909                 if (tmp == NULL) {
910                         DEBUG(1, ("%s: talloc failed\n", __func__));
911                         return false;
912                 }
913                 msg_ctx->waiters = tmp;
914         }
915
916         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
917                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
918
919         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
920         msg_ctx->num_new_waiters = 0;
921
922         return true;
923 }
924
925 /*
926   Dispatch one messaging_rec
927 */
928 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
929                                    struct messaging_rec *rec)
930 {
931         struct messaging_callback *cb, *next;
932         unsigned i;
933         size_t j;
934
935         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
936                 next = cb->next;
937                 if (cb->msg_type != rec->msg_type) {
938                         continue;
939                 }
940
941                 /*
942                  * the old style callbacks don't support fd passing
943                  */
944                 for (j=0; j < rec->num_fds; j++) {
945                         int fd = rec->fds[j];
946                         close(fd);
947                 }
948                 rec->num_fds = 0;
949                 rec->fds = NULL;
950
951                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
952                        rec->src, &rec->buf);
953
954                 /*
955                  * we continue looking for matching messages after finding
956                  * one. This matters for subsystems like the internal notify
957                  * code which register more than one handler for the same
958                  * message type
959                  */
960         }
961
962         if (!messaging_append_new_waiters(msg_ctx)) {
963                 for (j=0; j < rec->num_fds; j++) {
964                         int fd = rec->fds[j];
965                         close(fd);
966                 }
967                 rec->num_fds = 0;
968                 rec->fds = NULL;
969                 return;
970         }
971
972         i = 0;
973         while (i < msg_ctx->num_waiters) {
974                 struct tevent_req *req;
975                 struct messaging_filtered_read_state *state;
976
977                 req = msg_ctx->waiters[i];
978                 if (req == NULL) {
979                         /*
980                          * This got cleaned up. In the meantime,
981                          * move everything down one. We need
982                          * to keep the order of waiters, as
983                          * other code may depend on this.
984                          */
985                         if (i < msg_ctx->num_waiters - 1) {
986                                 memmove(&msg_ctx->waiters[i],
987                                         &msg_ctx->waiters[i+1],
988                                         sizeof(struct tevent_req *) *
989                                             (msg_ctx->num_waiters - i - 1));
990                         }
991                         msg_ctx->num_waiters -= 1;
992                         continue;
993                 }
994
995                 state = tevent_req_data(
996                         req, struct messaging_filtered_read_state);
997                 if (state->filter(rec, state->private_data)) {
998                         messaging_filtered_read_done(req, rec);
999
1000                         /*
1001                          * Only the first one gets the fd-array
1002                          */
1003                         rec->num_fds = 0;
1004                         rec->fds = NULL;
1005                 }
1006
1007                 i += 1;
1008         }
1009
1010         /*
1011          * If the fd-array isn't used, just close it.
1012          */
1013         for (j=0; j < rec->num_fds; j++) {
1014                 int fd = rec->fds[j];
1015                 close(fd);
1016         }
1017         rec->num_fds = 0;
1018         rec->fds = NULL;
1019 }
1020
1021 static int mess_parent_dgm_cleanup(void *private_data);
1022 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1023
1024 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1025 {
1026         struct tevent_req *req;
1027
1028         req = background_job_send(
1029                 msg, msg->event_ctx, msg, NULL, 0,
1030                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1031                             60*15),
1032                 mess_parent_dgm_cleanup, msg);
1033         if (req == NULL) {
1034                 return false;
1035         }
1036         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1037         return true;
1038 }
1039
1040 static int mess_parent_dgm_cleanup(void *private_data)
1041 {
1042         int ret;
1043
1044         ret = messaging_dgm_wipe();
1045         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1046                    ret ? strerror(ret) : "ok"));
1047         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1048                            60*15);
1049 }
1050
1051 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1052 {
1053         struct messaging_context *msg = tevent_req_callback_data(
1054                 req, struct messaging_context);
1055         NTSTATUS status;
1056
1057         status = background_job_recv(req);
1058         TALLOC_FREE(req);
1059         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1060                   nt_errstr(status)));
1061
1062         req = background_job_send(
1063                 msg, msg->event_ctx, msg, NULL, 0,
1064                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1065                             60*15),
1066                 mess_parent_dgm_cleanup, msg);
1067         if (req == NULL) {
1068                 DEBUG(1, ("background_job_send failed\n"));
1069                 return;
1070         }
1071         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1072 }
1073
1074 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1075 {
1076         int ret;
1077
1078         if (pid == 0) {
1079                 ret = messaging_dgm_wipe();
1080         } else {
1081                 ret = messaging_dgm_cleanup(pid);
1082         }
1083
1084         return ret;
1085 }
1086
1087 struct tevent_context *messaging_tevent_context(
1088         struct messaging_context *msg_ctx)
1089 {
1090         return msg_ctx->event_ctx;
1091 }
1092
1093 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1094 {
1095         return msg_ctx->names_db;
1096 }
1097
1098 /** @} **/