messages_dgm: Move directory handling up
[samba.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57
58 struct messaging_callback {
59         struct messaging_callback *prev, *next;
60         uint32 msg_type;
61         void (*fn)(struct messaging_context *msg, void *private_data, 
62                    uint32_t msg_type, 
63                    struct server_id server_id, DATA_BLOB *data);
64         void *private_data;
65 };
66
67 struct messaging_context {
68         struct server_id id;
69         struct tevent_context *event_ctx;
70         struct messaging_callback *callbacks;
71
72         struct tevent_req **new_waiters;
73         unsigned num_new_waiters;
74
75         struct tevent_req **waiters;
76         unsigned num_waiters;
77
78         struct messaging_backend *remote;
79
80         struct server_id_db *names_db;
81 };
82
83 struct messaging_hdr {
84         uint32_t msg_type;
85         struct server_id dst;
86         struct server_id src;
87 };
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32 msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         const struct messaging_hdr *hdr;
217         struct server_id_buf idbuf;
218         struct messaging_rec rec;
219         int64_t fds64[MIN(num_fds, INT8_MAX)];
220         size_t i;
221
222         if (msg_len < sizeof(*hdr)) {
223                 for (i=0; i < num_fds; i++) {
224                         close(fds[i]);
225                 }
226                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
227                 return;
228         }
229
230         if (num_fds > INT8_MAX) {
231                 for (i=0; i < num_fds; i++) {
232                         close(fds[i]);
233                 }
234                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
235                 return;
236         }
237
238         /*
239          * "consume" the fds by copying them and setting
240          * the original variable to -1
241          */
242         for (i=0; i < num_fds; i++) {
243                 fds64[i] = fds[i];
244                 fds[i] = -1;
245         }
246
247         /*
248          * messages_dgm guarantees alignment, so we can cast here
249          */
250         hdr = (const struct messaging_hdr *)msg;
251
252         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
253                    __func__, (unsigned)hdr->msg_type,
254                    (unsigned)(msg_len - sizeof(*hdr)),
255                    (unsigned)num_fds,
256                    server_id_str_buf(hdr->src, &idbuf)));
257
258         rec = (struct messaging_rec) {
259                 .msg_version = MESSAGE_VERSION,
260                 .msg_type = hdr->msg_type,
261                 .src = hdr->src,
262                 .dest = hdr->dst,
263                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
264                 .buf.length = msg_len - sizeof(*hdr),
265                 .num_fds = num_fds,
266                 .fds = fds64,
267         };
268
269         messaging_dispatch_rec(msg_ctx, &rec);
270 }
271
272 static int messaging_context_destructor(struct messaging_context *ctx)
273 {
274         unsigned i;
275
276         messaging_dgm_destroy();
277
278         for (i=0; i<ctx->num_new_waiters; i++) {
279                 if (ctx->new_waiters[i] != NULL) {
280                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
281                         ctx->new_waiters[i] = NULL;
282                 }
283         }
284         for (i=0; i<ctx->num_waiters; i++) {
285                 if (ctx->waiters[i] != NULL) {
286                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
287                         ctx->waiters[i] = NULL;
288                 }
289         }
290
291         return 0;
292 }
293
294 static const char *private_path(const char *name)
295 {
296         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
297 }
298
299 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
300                                          struct tevent_context *ev)
301 {
302         struct messaging_context *ctx;
303         NTSTATUS status;
304         int ret;
305         const char *lck_path;
306         const char *priv_path;
307         bool ok;
308
309         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
310                 return NULL;
311         }
312
313         ctx->id = procid_self();
314         ctx->event_ctx = ev;
315
316         sec_init();
317
318         lck_path = lock_path("msg");
319         if (lck_path == NULL) {
320                 TALLOC_FREE(ctx);
321                 return NULL;
322         }
323
324         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
325                                               0755);
326         if (!ok) {
327                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
328                            __func__, strerror(errno)));
329                 TALLOC_FREE(ctx);
330                 return NULL;
331         }
332
333         priv_path = private_path("sock");
334
335         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
336                                               0700);
337         if (!ok) {
338                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
339                            __func__, strerror(errno)));
340                 TALLOC_FREE(ctx);
341                 return NULL;
342         }
343
344         ret = messaging_dgm_init(ctx->event_ctx, ctx->id.unique_id,
345                                  priv_path, lck_path,
346                                  messaging_recv_cb, ctx);
347
348         if (ret != 0) {
349                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
350                 TALLOC_FREE(ctx);
351                 return NULL;
352         }
353
354         ctx->names_db = server_id_db_init(
355                 ctx, ctx->id, lp_cache_directory(), 0,
356                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
357         if (ctx->names_db == NULL) {
358                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
359                 TALLOC_FREE(ctx);
360                 return NULL;
361         }
362
363         talloc_set_destructor(ctx, messaging_context_destructor);
364
365         if (lp_clustering()) {
366                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
367
368                 if (!NT_STATUS_IS_OK(status)) {
369                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
370                                   nt_errstr(status)));
371                         TALLOC_FREE(ctx);
372                         return NULL;
373                 }
374         }
375         ctx->id.vnn = get_my_vnn();
376
377         messaging_register(ctx, NULL, MSG_PING, ping_message);
378
379         /* Register some debugging related messages */
380
381         register_msg_pool_usage(ctx);
382         register_dmalloc_msgs(ctx);
383         debug_register_msgs(ctx);
384
385         return ctx;
386 }
387
388 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
389 {
390         return msg_ctx->id;
391 }
392
393 /*
394  * re-init after a fork
395  */
396 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
397 {
398         NTSTATUS status;
399         int ret;
400
401         messaging_dgm_destroy();
402
403         msg_ctx->id = procid_self();
404
405         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id.unique_id,
406                                  private_path("sock"), lock_path("msg"),
407                                  messaging_recv_cb, msg_ctx);
408         if (ret != 0) {
409                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
410                 return map_nt_error_from_unix(ret);
411         }
412
413         TALLOC_FREE(msg_ctx->remote);
414
415         if (lp_clustering()) {
416                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
417                                               &msg_ctx->remote);
418
419                 if (!NT_STATUS_IS_OK(status)) {
420                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
421                                   nt_errstr(status)));
422                         return status;
423                 }
424         }
425
426         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
427
428         return NT_STATUS_OK;
429 }
430
431
432 /*
433  * Register a dispatch function for a particular message type. Allow multiple
434  * registrants
435 */
436 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
437                             void *private_data,
438                             uint32_t msg_type,
439                             void (*fn)(struct messaging_context *msg,
440                                        void *private_data, 
441                                        uint32_t msg_type, 
442                                        struct server_id server_id,
443                                        DATA_BLOB *data))
444 {
445         struct messaging_callback *cb;
446
447         DEBUG(5, ("Registering messaging pointer for type %u - "
448                   "private_data=%p\n",
449                   (unsigned)msg_type, private_data));
450
451         /*
452          * Only one callback per type
453          */
454
455         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
456                 /* we allow a second registration of the same message
457                    type if it has a different private pointer. This is
458                    needed in, for example, the internal notify code,
459                    which creates a new notify context for each tree
460                    connect, and expects to receive messages to each of
461                    them. */
462                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
463                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
464                                   (unsigned)msg_type, private_data));
465                         cb->fn = fn;
466                         cb->private_data = private_data;
467                         return NT_STATUS_OK;
468                 }
469         }
470
471         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
472                 return NT_STATUS_NO_MEMORY;
473         }
474
475         cb->msg_type = msg_type;
476         cb->fn = fn;
477         cb->private_data = private_data;
478
479         DLIST_ADD(msg_ctx->callbacks, cb);
480         return NT_STATUS_OK;
481 }
482
483 /*
484   De-register the function for a particular message type.
485 */
486 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
487                           void *private_data)
488 {
489         struct messaging_callback *cb, *next;
490
491         for (cb = ctx->callbacks; cb; cb = next) {
492                 next = cb->next;
493                 if ((cb->msg_type == msg_type)
494                     && (cb->private_data == private_data)) {
495                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
496                                   (unsigned)msg_type, private_data));
497                         DLIST_REMOVE(ctx->callbacks, cb);
498                         TALLOC_FREE(cb);
499                 }
500         }
501 }
502
503 /*
504   Send a message to a particular server
505 */
506 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
507                         struct server_id server, uint32_t msg_type,
508                         const DATA_BLOB *data)
509 {
510         struct iovec iov;
511
512         iov.iov_base = data->data;
513         iov.iov_len = data->length;
514
515         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
516 }
517
518 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
519                             struct server_id server, uint32_t msg_type,
520                             const uint8_t *buf, size_t len)
521 {
522         DATA_BLOB blob = data_blob_const(buf, len);
523         return messaging_send(msg_ctx, server, msg_type, &blob);
524 }
525
526 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
527                                  struct server_id src, struct server_id dst,
528                                  uint32_t msg_type,
529                                  const struct iovec *iov, int iovlen,
530                                  const int *fds, size_t num_fds)
531 {
532         int ret;
533         struct messaging_hdr hdr;
534         struct iovec iov2[iovlen+1];
535
536         if (server_id_is_disconnected(&dst)) {
537                 return NT_STATUS_INVALID_PARAMETER_MIX;
538         }
539
540         if (num_fds > INT8_MAX) {
541                 return NT_STATUS_INVALID_PARAMETER_MIX;
542         }
543
544         if (!procid_is_local(&dst)) {
545                 if (num_fds > 0) {
546                         return NT_STATUS_NOT_SUPPORTED;
547                 }
548
549                 ret = msg_ctx->remote->send_fn(src, dst,
550                                                msg_type, iov, iovlen,
551                                                NULL, 0,
552                                                msg_ctx->remote);
553                 if (ret != 0) {
554                         return map_nt_error_from_unix(ret);
555                 }
556                 return NT_STATUS_OK;
557         }
558
559         ZERO_STRUCT(hdr);
560         hdr = (struct messaging_hdr) {
561                 .msg_type = msg_type,
562                 .dst = dst,
563                 .src = src
564         };
565         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
566         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
567
568         become_root();
569         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
570         unbecome_root();
571
572         if (ret != 0) {
573                 return map_nt_error_from_unix(ret);
574         }
575         return NT_STATUS_OK;
576 }
577
578 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
579                             struct server_id dst, uint32_t msg_type,
580                             const struct iovec *iov, int iovlen,
581                             const int *fds, size_t num_fds)
582 {
583         return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
584                                        iov, iovlen, fds, num_fds);
585 }
586
587 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
588                                                struct messaging_rec *rec)
589 {
590         struct messaging_rec *result;
591         size_t fds_size = sizeof(int64_t) * rec->num_fds;
592
593         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
594                                       rec->buf.length + fds_size);
595         if (result == NULL) {
596                 return NULL;
597         }
598         *result = *rec;
599
600         /* Doesn't fail, see talloc_pooled_object */
601
602         result->buf.data = talloc_memdup(result, rec->buf.data,
603                                          rec->buf.length);
604
605         result->fds = NULL;
606         if (result->num_fds > 0) {
607                 result->fds = talloc_memdup(result, rec->fds, fds_size);
608         }
609
610         return result;
611 }
612
613 struct messaging_filtered_read_state {
614         struct tevent_context *ev;
615         struct messaging_context *msg_ctx;
616         void *tevent_handle;
617
618         bool (*filter)(struct messaging_rec *rec, void *private_data);
619         void *private_data;
620
621         struct messaging_rec *rec;
622 };
623
624 static void messaging_filtered_read_cleanup(struct tevent_req *req,
625                                             enum tevent_req_state req_state);
626
627 struct tevent_req *messaging_filtered_read_send(
628         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
629         struct messaging_context *msg_ctx,
630         bool (*filter)(struct messaging_rec *rec, void *private_data),
631         void *private_data)
632 {
633         struct tevent_req *req;
634         struct messaging_filtered_read_state *state;
635         size_t new_waiters_len;
636
637         req = tevent_req_create(mem_ctx, &state,
638                                 struct messaging_filtered_read_state);
639         if (req == NULL) {
640                 return NULL;
641         }
642         state->ev = ev;
643         state->msg_ctx = msg_ctx;
644         state->filter = filter;
645         state->private_data = private_data;
646
647         /*
648          * We have to defer the callback here, as we might be called from
649          * within a different tevent_context than state->ev
650          */
651         tevent_req_defer_callback(req, state->ev);
652
653         state->tevent_handle = messaging_dgm_register_tevent_context(
654                 state, ev);
655         if (tevent_req_nomem(state, req)) {
656                 return tevent_req_post(req, ev);
657         }
658
659         /*
660          * We add ourselves to the "new_waiters" array, not the "waiters"
661          * array. If we are called from within messaging_read_done,
662          * messaging_dispatch_rec will be in an active for-loop on
663          * "waiters". We must be careful not to mess with this array, because
664          * it could mean that a single event is being delivered twice.
665          */
666
667         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
668
669         if (new_waiters_len == msg_ctx->num_new_waiters) {
670                 struct tevent_req **tmp;
671
672                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
673                                      struct tevent_req *, new_waiters_len+1);
674                 if (tevent_req_nomem(tmp, req)) {
675                         return tevent_req_post(req, ev);
676                 }
677                 msg_ctx->new_waiters = tmp;
678         }
679
680         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
681         msg_ctx->num_new_waiters += 1;
682         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
683
684         return req;
685 }
686
687 static void messaging_filtered_read_cleanup(struct tevent_req *req,
688                                             enum tevent_req_state req_state)
689 {
690         struct messaging_filtered_read_state *state = tevent_req_data(
691                 req, struct messaging_filtered_read_state);
692         struct messaging_context *msg_ctx = state->msg_ctx;
693         unsigned i;
694
695         tevent_req_set_cleanup_fn(req, NULL);
696
697         TALLOC_FREE(state->tevent_handle);
698
699         /*
700          * Just set the [new_]waiters entry to NULL, be careful not to mess
701          * with the other "waiters" array contents. We are often called from
702          * within "messaging_dispatch_rec", which loops over
703          * "waiters". Messing with the "waiters" array will mess up that
704          * for-loop.
705          */
706
707         for (i=0; i<msg_ctx->num_waiters; i++) {
708                 if (msg_ctx->waiters[i] == req) {
709                         msg_ctx->waiters[i] = NULL;
710                         return;
711                 }
712         }
713
714         for (i=0; i<msg_ctx->num_new_waiters; i++) {
715                 if (msg_ctx->new_waiters[i] == req) {
716                         msg_ctx->new_waiters[i] = NULL;
717                         return;
718                 }
719         }
720 }
721
722 static void messaging_filtered_read_done(struct tevent_req *req,
723                                          struct messaging_rec *rec)
724 {
725         struct messaging_filtered_read_state *state = tevent_req_data(
726                 req, struct messaging_filtered_read_state);
727
728         state->rec = messaging_rec_dup(state, rec);
729         if (tevent_req_nomem(state->rec, req)) {
730                 return;
731         }
732         tevent_req_done(req);
733 }
734
735 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
736                                  struct messaging_rec **presult)
737 {
738         struct messaging_filtered_read_state *state = tevent_req_data(
739                 req, struct messaging_filtered_read_state);
740         int err;
741
742         if (tevent_req_is_unix_error(req, &err)) {
743                 tevent_req_received(req);
744                 return err;
745         }
746         *presult = talloc_move(mem_ctx, &state->rec);
747         return 0;
748 }
749
750 struct messaging_read_state {
751         uint32_t msg_type;
752         struct messaging_rec *rec;
753 };
754
755 static bool messaging_read_filter(struct messaging_rec *rec,
756                                   void *private_data);
757 static void messaging_read_done(struct tevent_req *subreq);
758
759 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
760                                        struct tevent_context *ev,
761                                        struct messaging_context *msg,
762                                        uint32_t msg_type)
763 {
764         struct tevent_req *req, *subreq;
765         struct messaging_read_state *state;
766
767         req = tevent_req_create(mem_ctx, &state,
768                                 struct messaging_read_state);
769         if (req == NULL) {
770                 return NULL;
771         }
772         state->msg_type = msg_type;
773
774         subreq = messaging_filtered_read_send(state, ev, msg,
775                                               messaging_read_filter, state);
776         if (tevent_req_nomem(subreq, req)) {
777                 return tevent_req_post(req, ev);
778         }
779         tevent_req_set_callback(subreq, messaging_read_done, req);
780         return req;
781 }
782
783 static bool messaging_read_filter(struct messaging_rec *rec,
784                                   void *private_data)
785 {
786         struct messaging_read_state *state = talloc_get_type_abort(
787                 private_data, struct messaging_read_state);
788
789         if (rec->num_fds != 0) {
790                 return false;
791         }
792
793         return rec->msg_type == state->msg_type;
794 }
795
796 static void messaging_read_done(struct tevent_req *subreq)
797 {
798         struct tevent_req *req = tevent_req_callback_data(
799                 subreq, struct tevent_req);
800         struct messaging_read_state *state = tevent_req_data(
801                 req, struct messaging_read_state);
802         int ret;
803
804         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
805         TALLOC_FREE(subreq);
806         if (tevent_req_error(req, ret)) {
807                 return;
808         }
809         tevent_req_done(req);
810 }
811
812 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
813                         struct messaging_rec **presult)
814 {
815         struct messaging_read_state *state = tevent_req_data(
816                 req, struct messaging_read_state);
817         int err;
818
819         if (tevent_req_is_unix_error(req, &err)) {
820                 return err;
821         }
822         if (presult != NULL) {
823                 *presult = talloc_move(mem_ctx, &state->rec);
824         }
825         return 0;
826 }
827
828 struct messaging_handler_state {
829         struct tevent_context *ev;
830         struct messaging_context *msg_ctx;
831         uint32_t msg_type;
832         bool (*handler)(struct messaging_context *msg_ctx,
833                         struct messaging_rec **rec, void *private_data);
834         void *private_data;
835 };
836
837 static void messaging_handler_got_msg(struct tevent_req *subreq);
838
839 struct tevent_req *messaging_handler_send(
840         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
841         struct messaging_context *msg_ctx, uint32_t msg_type,
842         bool (*handler)(struct messaging_context *msg_ctx,
843                         struct messaging_rec **rec, void *private_data),
844         void *private_data)
845 {
846         struct tevent_req *req, *subreq;
847         struct messaging_handler_state *state;
848
849         req = tevent_req_create(mem_ctx, &state,
850                                 struct messaging_handler_state);
851         if (req == NULL) {
852                 return NULL;
853         }
854         state->ev = ev;
855         state->msg_ctx = msg_ctx;
856         state->msg_type = msg_type;
857         state->handler = handler;
858         state->private_data = private_data;
859
860         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
861                                      state->msg_type);
862         if (tevent_req_nomem(subreq, req)) {
863                 return tevent_req_post(req, ev);
864         }
865         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
866         return req;
867 }
868
869 static void messaging_handler_got_msg(struct tevent_req *subreq)
870 {
871         struct tevent_req *req = tevent_req_callback_data(
872                 subreq, struct tevent_req);
873         struct messaging_handler_state *state = tevent_req_data(
874                 req, struct messaging_handler_state);
875         struct messaging_rec *rec;
876         int ret;
877         bool ok;
878
879         ret = messaging_read_recv(subreq, state, &rec);
880         TALLOC_FREE(subreq);
881         if (tevent_req_error(req, ret)) {
882                 return;
883         }
884
885         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
886                                      state->msg_type);
887         if (tevent_req_nomem(subreq, req)) {
888                 return;
889         }
890         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
891
892         ok = state->handler(state->msg_ctx, &rec, state->private_data);
893         TALLOC_FREE(rec);
894         if (ok) {
895                 /*
896                  * Next round
897                  */
898                 return;
899         }
900         TALLOC_FREE(subreq);
901         tevent_req_done(req);
902 }
903
904 int messaging_handler_recv(struct tevent_req *req)
905 {
906         return tevent_req_simple_recv_unix(req);
907 }
908
909 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
910 {
911         if (msg_ctx->num_new_waiters == 0) {
912                 return true;
913         }
914
915         if (talloc_array_length(msg_ctx->waiters) <
916             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
917                 struct tevent_req **tmp;
918                 tmp = talloc_realloc(
919                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
920                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
921                 if (tmp == NULL) {
922                         DEBUG(1, ("%s: talloc failed\n", __func__));
923                         return false;
924                 }
925                 msg_ctx->waiters = tmp;
926         }
927
928         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
929                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
930
931         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
932         msg_ctx->num_new_waiters = 0;
933
934         return true;
935 }
936
937 /*
938   Dispatch one messaging_rec
939 */
940 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
941                             struct messaging_rec *rec)
942 {
943         struct messaging_callback *cb, *next;
944         unsigned i;
945         size_t j;
946
947         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
948                 next = cb->next;
949                 if (cb->msg_type != rec->msg_type) {
950                         continue;
951                 }
952
953                 /*
954                  * the old style callbacks don't support fd passing
955                  */
956                 for (j=0; j < rec->num_fds; j++) {
957                         int fd = rec->fds[j];
958                         close(fd);
959                 }
960                 rec->num_fds = 0;
961                 rec->fds = NULL;
962
963                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
964                        rec->src, &rec->buf);
965
966                 /*
967                  * we continue looking for matching messages after finding
968                  * one. This matters for subsystems like the internal notify
969                  * code which register more than one handler for the same
970                  * message type
971                  */
972         }
973
974         if (!messaging_append_new_waiters(msg_ctx)) {
975                 for (j=0; j < rec->num_fds; j++) {
976                         int fd = rec->fds[j];
977                         close(fd);
978                 }
979                 rec->num_fds = 0;
980                 rec->fds = NULL;
981                 return;
982         }
983
984         i = 0;
985         while (i < msg_ctx->num_waiters) {
986                 struct tevent_req *req;
987                 struct messaging_filtered_read_state *state;
988
989                 req = msg_ctx->waiters[i];
990                 if (req == NULL) {
991                         /*
992                          * This got cleaned up. In the meantime,
993                          * move everything down one. We need
994                          * to keep the order of waiters, as
995                          * other code may depend on this.
996                          */
997                         if (i < msg_ctx->num_waiters - 1) {
998                                 memmove(&msg_ctx->waiters[i],
999                                         &msg_ctx->waiters[i+1],
1000                                         sizeof(struct tevent_req *) *
1001                                             (msg_ctx->num_waiters - i - 1));
1002                         }
1003                         msg_ctx->num_waiters -= 1;
1004                         continue;
1005                 }
1006
1007                 state = tevent_req_data(
1008                         req, struct messaging_filtered_read_state);
1009                 if (state->filter(rec, state->private_data)) {
1010                         messaging_filtered_read_done(req, rec);
1011
1012                         /*
1013                          * Only the first one gets the fd-array
1014                          */
1015                         rec->num_fds = 0;
1016                         rec->fds = NULL;
1017                 }
1018
1019                 i += 1;
1020         }
1021
1022         /*
1023          * If the fd-array isn't used, just close it.
1024          */
1025         for (j=0; j < rec->num_fds; j++) {
1026                 int fd = rec->fds[j];
1027                 close(fd);
1028         }
1029         rec->num_fds = 0;
1030         rec->fds = NULL;
1031 }
1032
1033 static int mess_parent_dgm_cleanup(void *private_data);
1034 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1035
1036 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1037 {
1038         struct tevent_req *req;
1039
1040         req = background_job_send(
1041                 msg, msg->event_ctx, msg, NULL, 0,
1042                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1043                             60*15),
1044                 mess_parent_dgm_cleanup, msg);
1045         if (req == NULL) {
1046                 return false;
1047         }
1048         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1049         return true;
1050 }
1051
1052 static int mess_parent_dgm_cleanup(void *private_data)
1053 {
1054         int ret;
1055
1056         ret = messaging_dgm_wipe();
1057         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1058                    ret ? strerror(ret) : "ok"));
1059         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1060                            60*15);
1061 }
1062
1063 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1064 {
1065         struct messaging_context *msg = tevent_req_callback_data(
1066                 req, struct messaging_context);
1067         NTSTATUS status;
1068
1069         status = background_job_recv(req);
1070         TALLOC_FREE(req);
1071         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1072                   nt_errstr(status)));
1073
1074         req = background_job_send(
1075                 msg, msg->event_ctx, msg, NULL, 0,
1076                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1077                             60*15),
1078                 mess_parent_dgm_cleanup, msg);
1079         if (req == NULL) {
1080                 DEBUG(1, ("background_job_send failed\n"));
1081         }
1082         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1083 }
1084
1085 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1086 {
1087         int ret;
1088
1089         if (pid == 0) {
1090                 ret = messaging_dgm_wipe();
1091         } else {
1092                 ret = messaging_dgm_cleanup(pid);
1093         }
1094
1095         return ret;
1096 }
1097
1098 struct tevent_context *messaging_tevent_context(
1099         struct messaging_context *msg_ctx)
1100 {
1101         return msg_ctx->event_ctx;
1102 }
1103
1104 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1105 {
1106         return msg_ctx->names_db;
1107 }
1108
1109 /** @} **/