lib: Fix CID 1362566 Dereference null return value
[bbaumbach/samba-autobuild/.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32_t msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         struct server_id_buf idbuf;
217         struct messaging_rec rec;
218         int64_t fds64[MIN(num_fds, INT8_MAX)];
219         size_t i;
220
221         if (msg_len < MESSAGE_HDR_LENGTH) {
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 goto close_fail;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
228                 goto close_fail;
229         }
230
231         /*
232          * "consume" the fds by copying them and setting
233          * the original variable to -1
234          */
235         for (i=0; i < num_fds; i++) {
236                 fds64[i] = fds[i];
237                 fds[i] = -1;
238         }
239
240         rec = (struct messaging_rec) {
241                 .msg_version = MESSAGE_VERSION,
242                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
243                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
244                 .num_fds = num_fds,
245                 .fds = fds64,
246         };
247
248         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
249
250         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
251                    __func__, (unsigned)rec.msg_type,
252                    (unsigned)rec.buf.length,
253                    (unsigned)num_fds,
254                    server_id_str_buf(rec.src, &idbuf)));
255
256         messaging_dispatch_rec(msg_ctx, &rec);
257         return;
258
259 close_fail:
260         for (i=0; i < num_fds; i++) {
261                 close(fds[i]);
262         }
263 }
264
265 static int messaging_context_destructor(struct messaging_context *ctx)
266 {
267         unsigned i;
268
269         for (i=0; i<ctx->num_new_waiters; i++) {
270                 if (ctx->new_waiters[i] != NULL) {
271                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
272                         ctx->new_waiters[i] = NULL;
273                 }
274         }
275         for (i=0; i<ctx->num_waiters; i++) {
276                 if (ctx->waiters[i] != NULL) {
277                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
278                         ctx->waiters[i] = NULL;
279                 }
280         }
281
282         return 0;
283 }
284
285 static const char *private_path(const char *name)
286 {
287         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
288 }
289
290 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
291                                          struct tevent_context *ev)
292 {
293         struct messaging_context *ctx;
294         int ret;
295         const char *lck_path;
296         const char *priv_path;
297         bool ok;
298
299         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
300                 return NULL;
301         }
302
303         ctx->id = (struct server_id) {
304                 .pid = getpid(), .vnn = NONCLUSTER_VNN
305         };
306
307         ctx->event_ctx = ev;
308
309         sec_init();
310
311         lck_path = lock_path("msg.lock");
312         if (lck_path == NULL) {
313                 TALLOC_FREE(ctx);
314                 return NULL;
315         }
316
317         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
318                                               0755);
319         if (!ok) {
320                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
321                            __func__, strerror(errno)));
322                 TALLOC_FREE(ctx);
323                 return NULL;
324         }
325
326         priv_path = private_path("msg.sock");
327         if (priv_path == NULL) {
328                 TALLOC_FREE(ctx);
329                 return NULL;
330         }
331
332         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
333                                               0700);
334         if (!ok) {
335                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
336                            __func__, strerror(errno)));
337                 TALLOC_FREE(ctx);
338                 return NULL;
339         }
340
341         ctx->msg_dgm_ref = messaging_dgm_ref(
342                 ctx, ctx->event_ctx, &ctx->id.unique_id,
343                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
344
345         if (ctx->msg_dgm_ref == NULL) {
346                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
347                 TALLOC_FREE(ctx);
348                 return NULL;
349         }
350
351         talloc_set_destructor(ctx, messaging_context_destructor);
352
353         if (lp_clustering()) {
354                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
355
356                 if (ret != 0) {
357                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
358                                   strerror(ret)));
359                         TALLOC_FREE(ctx);
360                         return NULL;
361                 }
362         }
363         ctx->id.vnn = get_my_vnn();
364
365         ctx->names_db = server_id_db_init(
366                 ctx, ctx->id, lp_lock_directory(), 0,
367                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
368         if (ctx->names_db == NULL) {
369                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
370                 TALLOC_FREE(ctx);
371                 return NULL;
372         }
373
374         messaging_register(ctx, NULL, MSG_PING, ping_message);
375
376         /* Register some debugging related messages */
377
378         register_msg_pool_usage(ctx);
379         register_dmalloc_msgs(ctx);
380         debug_register_msgs(ctx);
381
382         return ctx;
383 }
384
385 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
386 {
387         return msg_ctx->id;
388 }
389
390 /*
391  * re-init after a fork
392  */
393 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
394 {
395         int ret;
396         char *lck_path;
397
398         TALLOC_FREE(msg_ctx->msg_dgm_ref);
399
400         msg_ctx->id = (struct server_id) {
401                 .pid = getpid(), .vnn = msg_ctx->id.vnn
402         };
403
404         lck_path = lock_path("msg.lock");
405         if (lck_path == NULL) {
406                 return NT_STATUS_NO_MEMORY;
407         }
408
409         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
410                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
411                 private_path("msg.sock"), lck_path,
412                 messaging_recv_cb, msg_ctx, &ret);
413
414         if (msg_ctx->msg_dgm_ref == NULL) {
415                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
416                 return map_nt_error_from_unix(ret);
417         }
418
419         TALLOC_FREE(msg_ctx->remote);
420
421         if (lp_clustering()) {
422                 ret = messaging_ctdbd_init(msg_ctx, msg_ctx,
423                                            &msg_ctx->remote);
424
425                 if (ret != 0) {
426                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
427                                   strerror(ret)));
428                         return map_nt_error_from_unix(ret);
429                 }
430         }
431
432         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
433
434         return NT_STATUS_OK;
435 }
436
437
438 /*
439  * Register a dispatch function for a particular message type. Allow multiple
440  * registrants
441 */
442 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
443                             void *private_data,
444                             uint32_t msg_type,
445                             void (*fn)(struct messaging_context *msg,
446                                        void *private_data, 
447                                        uint32_t msg_type, 
448                                        struct server_id server_id,
449                                        DATA_BLOB *data))
450 {
451         struct messaging_callback *cb;
452
453         DEBUG(5, ("Registering messaging pointer for type %u - "
454                   "private_data=%p\n",
455                   (unsigned)msg_type, private_data));
456
457         /*
458          * Only one callback per type
459          */
460
461         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
462                 /* we allow a second registration of the same message
463                    type if it has a different private pointer. This is
464                    needed in, for example, the internal notify code,
465                    which creates a new notify context for each tree
466                    connect, and expects to receive messages to each of
467                    them. */
468                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
469                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
470                                   (unsigned)msg_type, private_data));
471                         cb->fn = fn;
472                         cb->private_data = private_data;
473                         return NT_STATUS_OK;
474                 }
475         }
476
477         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
478                 return NT_STATUS_NO_MEMORY;
479         }
480
481         cb->msg_type = msg_type;
482         cb->fn = fn;
483         cb->private_data = private_data;
484
485         DLIST_ADD(msg_ctx->callbacks, cb);
486         return NT_STATUS_OK;
487 }
488
489 /*
490   De-register the function for a particular message type.
491 */
492 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
493                           void *private_data)
494 {
495         struct messaging_callback *cb, *next;
496
497         for (cb = ctx->callbacks; cb; cb = next) {
498                 next = cb->next;
499                 if ((cb->msg_type == msg_type)
500                     && (cb->private_data == private_data)) {
501                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
502                                   (unsigned)msg_type, private_data));
503                         DLIST_REMOVE(ctx->callbacks, cb);
504                         TALLOC_FREE(cb);
505                 }
506         }
507 }
508
509 /*
510   Send a message to a particular server
511 */
512 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
513                         struct server_id server, uint32_t msg_type,
514                         const DATA_BLOB *data)
515 {
516         struct iovec iov;
517
518         iov.iov_base = data->data;
519         iov.iov_len = data->length;
520
521         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
522 }
523
524 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
525                             struct server_id server, uint32_t msg_type,
526                             const uint8_t *buf, size_t len)
527 {
528         DATA_BLOB blob = data_blob_const(buf, len);
529         return messaging_send(msg_ctx, server, msg_type, &blob);
530 }
531
532 int messaging_send_iov_from(struct messaging_context *msg_ctx,
533                             struct server_id src, struct server_id dst,
534                             uint32_t msg_type,
535                             const struct iovec *iov, int iovlen,
536                             const int *fds, size_t num_fds)
537 {
538         int ret;
539         uint8_t hdr[MESSAGE_HDR_LENGTH];
540         struct iovec iov2[iovlen+1];
541
542         if (server_id_is_disconnected(&dst)) {
543                 return EINVAL;
544         }
545
546         if (num_fds > INT8_MAX) {
547                 return EINVAL;
548         }
549
550         if (!procid_is_local(&dst)) {
551                 if (num_fds > 0) {
552                         return ENOSYS;
553                 }
554
555                 ret = msg_ctx->remote->send_fn(src, dst,
556                                                msg_type, iov, iovlen,
557                                                NULL, 0,
558                                                msg_ctx->remote);
559                 return ret;
560         }
561
562         message_hdr_put(hdr, msg_type, src, dst);
563         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
564         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
565
566         become_root();
567         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
568         unbecome_root();
569
570         return ret;
571 }
572
573 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
574                             struct server_id dst, uint32_t msg_type,
575                             const struct iovec *iov, int iovlen,
576                             const int *fds, size_t num_fds)
577 {
578         int ret;
579
580         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
581                                       iov, iovlen, fds, num_fds);
582         if (ret != 0) {
583                 return map_nt_error_from_unix(ret);
584         }
585         return NT_STATUS_OK;
586 }
587
588 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
589                                                struct messaging_rec *rec)
590 {
591         struct messaging_rec *result;
592         size_t fds_size = sizeof(int64_t) * rec->num_fds;
593
594         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
595                                       rec->buf.length + fds_size);
596         if (result == NULL) {
597                 return NULL;
598         }
599         *result = *rec;
600
601         /* Doesn't fail, see talloc_pooled_object */
602
603         result->buf.data = talloc_memdup(result, rec->buf.data,
604                                          rec->buf.length);
605
606         result->fds = NULL;
607         if (result->num_fds > 0) {
608                 result->fds = talloc_memdup(result, rec->fds, fds_size);
609         }
610
611         return result;
612 }
613
614 struct messaging_filtered_read_state {
615         struct tevent_context *ev;
616         struct messaging_context *msg_ctx;
617         void *tevent_handle;
618
619         bool (*filter)(struct messaging_rec *rec, void *private_data);
620         void *private_data;
621
622         struct messaging_rec *rec;
623 };
624
625 static void messaging_filtered_read_cleanup(struct tevent_req *req,
626                                             enum tevent_req_state req_state);
627
628 struct tevent_req *messaging_filtered_read_send(
629         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
630         struct messaging_context *msg_ctx,
631         bool (*filter)(struct messaging_rec *rec, void *private_data),
632         void *private_data)
633 {
634         struct tevent_req *req;
635         struct messaging_filtered_read_state *state;
636         size_t new_waiters_len;
637
638         req = tevent_req_create(mem_ctx, &state,
639                                 struct messaging_filtered_read_state);
640         if (req == NULL) {
641                 return NULL;
642         }
643         state->ev = ev;
644         state->msg_ctx = msg_ctx;
645         state->filter = filter;
646         state->private_data = private_data;
647
648         /*
649          * We have to defer the callback here, as we might be called from
650          * within a different tevent_context than state->ev
651          */
652         tevent_req_defer_callback(req, state->ev);
653
654         state->tevent_handle = messaging_dgm_register_tevent_context(
655                 state, ev);
656         if (tevent_req_nomem(state->tevent_handle, req)) {
657                 return tevent_req_post(req, ev);
658         }
659
660         /*
661          * We add ourselves to the "new_waiters" array, not the "waiters"
662          * array. If we are called from within messaging_read_done,
663          * messaging_dispatch_rec will be in an active for-loop on
664          * "waiters". We must be careful not to mess with this array, because
665          * it could mean that a single event is being delivered twice.
666          */
667
668         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
669
670         if (new_waiters_len == msg_ctx->num_new_waiters) {
671                 struct tevent_req **tmp;
672
673                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
674                                      struct tevent_req *, new_waiters_len+1);
675                 if (tevent_req_nomem(tmp, req)) {
676                         return tevent_req_post(req, ev);
677                 }
678                 msg_ctx->new_waiters = tmp;
679         }
680
681         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
682         msg_ctx->num_new_waiters += 1;
683         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
684
685         return req;
686 }
687
688 static void messaging_filtered_read_cleanup(struct tevent_req *req,
689                                             enum tevent_req_state req_state)
690 {
691         struct messaging_filtered_read_state *state = tevent_req_data(
692                 req, struct messaging_filtered_read_state);
693         struct messaging_context *msg_ctx = state->msg_ctx;
694         unsigned i;
695
696         tevent_req_set_cleanup_fn(req, NULL);
697
698         TALLOC_FREE(state->tevent_handle);
699
700         /*
701          * Just set the [new_]waiters entry to NULL, be careful not to mess
702          * with the other "waiters" array contents. We are often called from
703          * within "messaging_dispatch_rec", which loops over
704          * "waiters". Messing with the "waiters" array will mess up that
705          * for-loop.
706          */
707
708         for (i=0; i<msg_ctx->num_waiters; i++) {
709                 if (msg_ctx->waiters[i] == req) {
710                         msg_ctx->waiters[i] = NULL;
711                         return;
712                 }
713         }
714
715         for (i=0; i<msg_ctx->num_new_waiters; i++) {
716                 if (msg_ctx->new_waiters[i] == req) {
717                         msg_ctx->new_waiters[i] = NULL;
718                         return;
719                 }
720         }
721 }
722
723 static void messaging_filtered_read_done(struct tevent_req *req,
724                                          struct messaging_rec *rec)
725 {
726         struct messaging_filtered_read_state *state = tevent_req_data(
727                 req, struct messaging_filtered_read_state);
728
729         state->rec = messaging_rec_dup(state, rec);
730         if (tevent_req_nomem(state->rec, req)) {
731                 return;
732         }
733         tevent_req_done(req);
734 }
735
736 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
737                                  struct messaging_rec **presult)
738 {
739         struct messaging_filtered_read_state *state = tevent_req_data(
740                 req, struct messaging_filtered_read_state);
741         int err;
742
743         if (tevent_req_is_unix_error(req, &err)) {
744                 tevent_req_received(req);
745                 return err;
746         }
747         *presult = talloc_move(mem_ctx, &state->rec);
748         return 0;
749 }
750
751 struct messaging_read_state {
752         uint32_t msg_type;
753         struct messaging_rec *rec;
754 };
755
756 static bool messaging_read_filter(struct messaging_rec *rec,
757                                   void *private_data);
758 static void messaging_read_done(struct tevent_req *subreq);
759
760 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
761                                        struct tevent_context *ev,
762                                        struct messaging_context *msg,
763                                        uint32_t msg_type)
764 {
765         struct tevent_req *req, *subreq;
766         struct messaging_read_state *state;
767
768         req = tevent_req_create(mem_ctx, &state,
769                                 struct messaging_read_state);
770         if (req == NULL) {
771                 return NULL;
772         }
773         state->msg_type = msg_type;
774
775         subreq = messaging_filtered_read_send(state, ev, msg,
776                                               messaging_read_filter, state);
777         if (tevent_req_nomem(subreq, req)) {
778                 return tevent_req_post(req, ev);
779         }
780         tevent_req_set_callback(subreq, messaging_read_done, req);
781         return req;
782 }
783
784 static bool messaging_read_filter(struct messaging_rec *rec,
785                                   void *private_data)
786 {
787         struct messaging_read_state *state = talloc_get_type_abort(
788                 private_data, struct messaging_read_state);
789
790         if (rec->num_fds != 0) {
791                 return false;
792         }
793
794         return rec->msg_type == state->msg_type;
795 }
796
797 static void messaging_read_done(struct tevent_req *subreq)
798 {
799         struct tevent_req *req = tevent_req_callback_data(
800                 subreq, struct tevent_req);
801         struct messaging_read_state *state = tevent_req_data(
802                 req, struct messaging_read_state);
803         int ret;
804
805         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
806         TALLOC_FREE(subreq);
807         if (tevent_req_error(req, ret)) {
808                 return;
809         }
810         tevent_req_done(req);
811 }
812
813 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
814                         struct messaging_rec **presult)
815 {
816         struct messaging_read_state *state = tevent_req_data(
817                 req, struct messaging_read_state);
818         int err;
819
820         if (tevent_req_is_unix_error(req, &err)) {
821                 return err;
822         }
823         if (presult != NULL) {
824                 *presult = talloc_move(mem_ctx, &state->rec);
825         }
826         return 0;
827 }
828
829 struct messaging_handler_state {
830         struct tevent_context *ev;
831         struct messaging_context *msg_ctx;
832         uint32_t msg_type;
833         bool (*handler)(struct messaging_context *msg_ctx,
834                         struct messaging_rec **rec, void *private_data);
835         void *private_data;
836 };
837
838 static void messaging_handler_got_msg(struct tevent_req *subreq);
839
840 struct tevent_req *messaging_handler_send(
841         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
842         struct messaging_context *msg_ctx, uint32_t msg_type,
843         bool (*handler)(struct messaging_context *msg_ctx,
844                         struct messaging_rec **rec, void *private_data),
845         void *private_data)
846 {
847         struct tevent_req *req, *subreq;
848         struct messaging_handler_state *state;
849
850         req = tevent_req_create(mem_ctx, &state,
851                                 struct messaging_handler_state);
852         if (req == NULL) {
853                 return NULL;
854         }
855         state->ev = ev;
856         state->msg_ctx = msg_ctx;
857         state->msg_type = msg_type;
858         state->handler = handler;
859         state->private_data = private_data;
860
861         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
862                                      state->msg_type);
863         if (tevent_req_nomem(subreq, req)) {
864                 return tevent_req_post(req, ev);
865         }
866         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
867         return req;
868 }
869
870 static void messaging_handler_got_msg(struct tevent_req *subreq)
871 {
872         struct tevent_req *req = tevent_req_callback_data(
873                 subreq, struct tevent_req);
874         struct messaging_handler_state *state = tevent_req_data(
875                 req, struct messaging_handler_state);
876         struct messaging_rec *rec;
877         int ret;
878         bool ok;
879
880         ret = messaging_read_recv(subreq, state, &rec);
881         TALLOC_FREE(subreq);
882         if (tevent_req_error(req, ret)) {
883                 return;
884         }
885
886         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
887                                      state->msg_type);
888         if (tevent_req_nomem(subreq, req)) {
889                 return;
890         }
891         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
892
893         ok = state->handler(state->msg_ctx, &rec, state->private_data);
894         TALLOC_FREE(rec);
895         if (ok) {
896                 /*
897                  * Next round
898                  */
899                 return;
900         }
901         TALLOC_FREE(subreq);
902         tevent_req_done(req);
903 }
904
905 int messaging_handler_recv(struct tevent_req *req)
906 {
907         return tevent_req_simple_recv_unix(req);
908 }
909
910 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
911 {
912         if (msg_ctx->num_new_waiters == 0) {
913                 return true;
914         }
915
916         if (talloc_array_length(msg_ctx->waiters) <
917             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
918                 struct tevent_req **tmp;
919                 tmp = talloc_realloc(
920                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
921                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
922                 if (tmp == NULL) {
923                         DEBUG(1, ("%s: talloc failed\n", __func__));
924                         return false;
925                 }
926                 msg_ctx->waiters = tmp;
927         }
928
929         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
930                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
931
932         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
933         msg_ctx->num_new_waiters = 0;
934
935         return true;
936 }
937
938 /*
939   Dispatch one messaging_rec
940 */
941 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
942                                    struct messaging_rec *rec)
943 {
944         struct messaging_callback *cb, *next;
945         unsigned i;
946         size_t j;
947
948         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
949                 next = cb->next;
950                 if (cb->msg_type != rec->msg_type) {
951                         continue;
952                 }
953
954                 /*
955                  * the old style callbacks don't support fd passing
956                  */
957                 for (j=0; j < rec->num_fds; j++) {
958                         int fd = rec->fds[j];
959                         close(fd);
960                 }
961                 rec->num_fds = 0;
962                 rec->fds = NULL;
963
964                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
965                        rec->src, &rec->buf);
966
967                 /*
968                  * we continue looking for matching messages after finding
969                  * one. This matters for subsystems like the internal notify
970                  * code which register more than one handler for the same
971                  * message type
972                  */
973         }
974
975         if (!messaging_append_new_waiters(msg_ctx)) {
976                 for (j=0; j < rec->num_fds; j++) {
977                         int fd = rec->fds[j];
978                         close(fd);
979                 }
980                 rec->num_fds = 0;
981                 rec->fds = NULL;
982                 return;
983         }
984
985         i = 0;
986         while (i < msg_ctx->num_waiters) {
987                 struct tevent_req *req;
988                 struct messaging_filtered_read_state *state;
989
990                 req = msg_ctx->waiters[i];
991                 if (req == NULL) {
992                         /*
993                          * This got cleaned up. In the meantime,
994                          * move everything down one. We need
995                          * to keep the order of waiters, as
996                          * other code may depend on this.
997                          */
998                         if (i < msg_ctx->num_waiters - 1) {
999                                 memmove(&msg_ctx->waiters[i],
1000                                         &msg_ctx->waiters[i+1],
1001                                         sizeof(struct tevent_req *) *
1002                                             (msg_ctx->num_waiters - i - 1));
1003                         }
1004                         msg_ctx->num_waiters -= 1;
1005                         continue;
1006                 }
1007
1008                 state = tevent_req_data(
1009                         req, struct messaging_filtered_read_state);
1010                 if (state->filter(rec, state->private_data)) {
1011                         messaging_filtered_read_done(req, rec);
1012
1013                         /*
1014                          * Only the first one gets the fd-array
1015                          */
1016                         rec->num_fds = 0;
1017                         rec->fds = NULL;
1018                 }
1019
1020                 i += 1;
1021         }
1022
1023         /*
1024          * If the fd-array isn't used, just close it.
1025          */
1026         for (j=0; j < rec->num_fds; j++) {
1027                 int fd = rec->fds[j];
1028                 close(fd);
1029         }
1030         rec->num_fds = 0;
1031         rec->fds = NULL;
1032 }
1033
1034 static int mess_parent_dgm_cleanup(void *private_data);
1035 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1036
1037 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1038 {
1039         struct tevent_req *req;
1040
1041         req = background_job_send(
1042                 msg, msg->event_ctx, msg, NULL, 0,
1043                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1044                             60*15),
1045                 mess_parent_dgm_cleanup, msg);
1046         if (req == NULL) {
1047                 return false;
1048         }
1049         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1050         return true;
1051 }
1052
1053 static int mess_parent_dgm_cleanup(void *private_data)
1054 {
1055         int ret;
1056
1057         ret = messaging_dgm_wipe();
1058         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1059                    ret ? strerror(ret) : "ok"));
1060         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1061                            60*15);
1062 }
1063
1064 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1065 {
1066         struct messaging_context *msg = tevent_req_callback_data(
1067                 req, struct messaging_context);
1068         NTSTATUS status;
1069
1070         status = background_job_recv(req);
1071         TALLOC_FREE(req);
1072         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1073                   nt_errstr(status)));
1074
1075         req = background_job_send(
1076                 msg, msg->event_ctx, msg, NULL, 0,
1077                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1078                             60*15),
1079                 mess_parent_dgm_cleanup, msg);
1080         if (req == NULL) {
1081                 DEBUG(1, ("background_job_send failed\n"));
1082                 return;
1083         }
1084         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1085 }
1086
1087 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1088 {
1089         int ret;
1090
1091         if (pid == 0) {
1092                 ret = messaging_dgm_wipe();
1093         } else {
1094                 ret = messaging_dgm_cleanup(pid);
1095         }
1096
1097         return ret;
1098 }
1099
1100 struct tevent_context *messaging_tevent_context(
1101         struct messaging_context *msg_ctx)
1102 {
1103         return msg_ctx->event_ctx;
1104 }
1105
1106 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1107 {
1108         return msg_ctx->names_db;
1109 }
1110
1111 /** @} **/