ctdb: Remove an unnecessary cast
[vlendec/samba-autobuild/.git] / source3 / lib / messages_dgm.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Samba internal messaging functions
4  * Copyright (C) 2013 by Volker Lendecke
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "util/util.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/select.h"
26 #include "lib/util/debug.h"
27 #include "lib/messages_dgm.h"
28 #include "lib/util/genrand.h"
29 #include "lib/util/dlinklist.h"
30 #include "lib/pthreadpool/pthreadpool_tevent.h"
31 #include "lib/util/msghdr.h"
32 #include "lib/util/iov_buf.h"
33 #include "lib/util/blocking.h"
34 #include "lib/util/tevent_unix.h"
35
36 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
37
38 struct sun_path_buf {
39         /*
40          * This will carry enough for a socket path
41          */
42         char buf[sizeof(struct sockaddr_un)];
43 };
44
45 /*
46  * We can only have one tevent_fd per dgm_context and per
47  * tevent_context. Maintain a list of registered tevent_contexts per
48  * dgm_context.
49  */
50 struct messaging_dgm_fde_ev {
51         struct messaging_dgm_fde_ev *prev, *next;
52
53         /*
54          * Backreference to enable DLIST_REMOVE from our
55          * destructor. Also, set to NULL when the dgm_context dies
56          * before the messaging_dgm_fde_ev.
57          */
58         struct messaging_dgm_context *ctx;
59
60         struct tevent_context *ev;
61         struct tevent_fd *fde;
62 };
63
64 struct messaging_dgm_out {
65         struct messaging_dgm_out *prev, *next;
66         struct messaging_dgm_context *ctx;
67
68         pid_t pid;
69         int sock;
70         bool is_blocking;
71         uint64_t cookie;
72
73         struct tevent_queue *queue;
74         struct tevent_timer *idle_timer;
75 };
76
77 struct messaging_dgm_in_msg {
78         struct messaging_dgm_in_msg *prev, *next;
79         struct messaging_dgm_context *ctx;
80         size_t msglen;
81         size_t received;
82         pid_t sender_pid;
83         int sender_sock;
84         uint64_t cookie;
85         uint8_t buf[];
86 };
87
88 struct messaging_dgm_context {
89         struct tevent_context *ev;
90         pid_t pid;
91         struct sun_path_buf socket_dir;
92         struct sun_path_buf lockfile_dir;
93         int lockfile_fd;
94
95         int sock;
96         struct messaging_dgm_in_msg *in_msgs;
97
98         struct messaging_dgm_fde_ev *fde_evs;
99         void (*recv_cb)(struct tevent_context *ev,
100                         const uint8_t *msg,
101                         size_t msg_len,
102                         int *fds,
103                         size_t num_fds,
104                         void *private_data);
105         void *recv_cb_private_data;
106
107         bool *have_dgm_context;
108
109         struct pthreadpool_tevent *pool;
110         struct messaging_dgm_out *outsocks;
111 };
112
113 /* Set socket close on exec. */
114 static int prepare_socket_cloexec(int sock)
115 {
116 #ifdef FD_CLOEXEC
117         int flags;
118
119         flags = fcntl(sock, F_GETFD, 0);
120         if (flags == -1) {
121                 return errno;
122         }
123         flags |= FD_CLOEXEC;
124         if (fcntl(sock, F_SETFD, flags) == -1) {
125                 return errno;
126         }
127 #endif
128         return 0;
129 }
130
131 static void close_fd_array(int *fds, size_t num_fds)
132 {
133         size_t i;
134
135         for (i = 0; i < num_fds; i++) {
136                 if (fds[i] == -1) {
137                         continue;
138                 }
139
140                 close(fds[i]);
141                 fds[i] = -1;
142         }
143 }
144
145 /*
146  * The idle handler can free the struct messaging_dgm_out *,
147  * if it's unused (qlen of zero) which closes the socket.
148  */
149
150 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
151                                            struct tevent_timer *te,
152                                            struct timeval current_time,
153                                            void *private_data)
154 {
155         struct messaging_dgm_out *out = talloc_get_type_abort(
156                 private_data, struct messaging_dgm_out);
157         size_t qlen;
158
159         out->idle_timer = NULL;
160
161         qlen = tevent_queue_length(out->queue);
162         if (qlen == 0) {
163                 TALLOC_FREE(out);
164         }
165 }
166
167 /*
168  * Setup the idle handler to fire afer 1 second if the
169  * queue is zero.
170  */
171
172 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
173 {
174         size_t qlen;
175
176         qlen = tevent_queue_length(out->queue);
177         if (qlen != 0) {
178                 TALLOC_FREE(out->idle_timer);
179                 return;
180         }
181
182         if (out->idle_timer != NULL) {
183                 tevent_update_timer(out->idle_timer,
184                                     tevent_timeval_current_ofs(1, 0));
185                 return;
186         }
187
188         out->idle_timer = tevent_add_timer(
189                 out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
190                 messaging_dgm_out_idle_handler, out);
191         /*
192          * No NULL check, we'll come back here. Worst case we're
193          * leaking a bit.
194          */
195 }
196
197 static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
198 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
199                                            struct tevent_timer *te,
200                                            struct timeval current_time,
201                                            void *private_data);
202
203 /*
204  * Connect to an existing rendezvous point for another
205  * pid - wrapped inside a struct messaging_dgm_out *.
206  */
207
208 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
209                                     struct messaging_dgm_context *ctx,
210                                     pid_t pid, struct messaging_dgm_out **pout)
211 {
212         struct messaging_dgm_out *out;
213         struct sockaddr_un addr = { .sun_family = AF_UNIX };
214         int ret = ENOMEM;
215         int out_pathlen;
216         char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
217
218         out = talloc(mem_ctx, struct messaging_dgm_out);
219         if (out == NULL) {
220                 goto fail;
221         }
222
223         *out = (struct messaging_dgm_out) {
224                 .pid = pid,
225                 .ctx = ctx,
226                 .cookie = 1
227         };
228
229         out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
230                                "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
231         if (out_pathlen < 0) {
232                 goto errno_fail;
233         }
234         if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
235                 ret = ENAMETOOLONG;
236                 goto fail;
237         }
238
239         memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
240
241         out->queue = tevent_queue_create(out, addr.sun_path);
242         if (out->queue == NULL) {
243                 ret = ENOMEM;
244                 goto fail;
245         }
246
247         out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
248         if (out->sock == -1) {
249                 goto errno_fail;
250         }
251
252         DLIST_ADD(ctx->outsocks, out);
253         talloc_set_destructor(out, messaging_dgm_out_destructor);
254
255         do {
256                 ret = connect(out->sock,
257                               (const struct sockaddr *)(const void *)&addr,
258                               sizeof(addr));
259         } while ((ret == -1) && (errno == EINTR));
260
261         if (ret == -1) {
262                 goto errno_fail;
263         }
264
265         ret = set_blocking(out->sock, false);
266         if (ret == -1) {
267                 goto errno_fail;
268         }
269         out->is_blocking = false;
270
271         *pout = out;
272         return 0;
273 errno_fail:
274         ret = errno;
275 fail:
276         TALLOC_FREE(out);
277         return ret;
278 }
279
280 static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
281 {
282         DLIST_REMOVE(out->ctx->outsocks, out);
283
284         if ((tevent_queue_length(out->queue) != 0) &&
285             (getpid() == out->ctx->pid)) {
286                 /*
287                  * We have pending jobs. We can't close the socket,
288                  * this has been handed over to messaging_dgm_out_queue_state.
289                  */
290                 return 0;
291         }
292
293         if (out->sock != -1) {
294                 close(out->sock);
295                 out->sock = -1;
296         }
297         return 0;
298 }
299
300 /*
301  * Find the struct messaging_dgm_out * to talk to pid.
302  * If we don't have one, create it. Set the timer to
303  * delete after 1 sec.
304  */
305
306 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
307                                  struct messaging_dgm_out **pout)
308 {
309         struct messaging_dgm_out *out;
310         int ret;
311
312         for (out = ctx->outsocks; out != NULL; out = out->next) {
313                 if (out->pid == pid) {
314                         break;
315                 }
316         }
317
318         if (out == NULL) {
319                 ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
320                 if (ret != 0) {
321                         return ret;
322                 }
323         }
324
325         messaging_dgm_out_rearm_idle_timer(out);
326
327         *pout = out;
328         return 0;
329 }
330
331 /*
332  * This function is called directly to send a message fragment
333  * when the outgoing queue is zero, and from a pthreadpool
334  * job thread when messages are being queued (qlen != 0).
335  * Make sure *ONLY* thread-safe functions are called within.
336  */
337
338 static ssize_t messaging_dgm_sendmsg(int sock,
339                                      const struct iovec *iov, int iovlen,
340                                      const int *fds, size_t num_fds,
341                                      int *perrno)
342 {
343         struct msghdr msg;
344         ssize_t fdlen, ret;
345
346         /*
347          * Do the actual sendmsg syscall. This will be called from a
348          * pthreadpool helper thread, so be careful what you do here.
349          */
350
351         msg = (struct msghdr) {
352                 .msg_iov = discard_const_p(struct iovec, iov),
353                 .msg_iovlen = iovlen
354         };
355
356         fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
357         if (fdlen == -1) {
358                 *perrno = EINVAL;
359                 return -1;
360         }
361
362         {
363                 uint8_t buf[fdlen];
364
365                 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
366
367                 do {
368                         ret = sendmsg(sock, &msg, 0);
369                 } while ((ret == -1) && (errno == EINTR));
370         }
371
372         if (ret == -1) {
373                 *perrno = errno;
374         }
375         return ret;
376 }
377
378 struct messaging_dgm_out_queue_state {
379         struct tevent_context *ev;
380         struct pthreadpool_tevent *pool;
381
382         struct tevent_req *req;
383         struct tevent_req *subreq;
384
385         int sock;
386
387         int *fds;
388         uint8_t *buf;
389
390         ssize_t sent;
391         int err;
392 };
393
394 static int messaging_dgm_out_queue_state_destructor(
395         struct messaging_dgm_out_queue_state *state);
396 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
397                                            void *private_data);
398 static void messaging_dgm_out_threaded_job(void *private_data);
399 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
400
401 /*
402  * Push a message fragment onto a queue to be sent by a
403  * threadpool job. Makes copies of data/fd's to be sent.
404  * The running tevent_queue internally creates an immediate
405  * event to schedule the write.
406  */
407
408 static struct tevent_req *messaging_dgm_out_queue_send(
409         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
410         struct messaging_dgm_out *out,
411         const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
412 {
413         struct tevent_req *req;
414         struct messaging_dgm_out_queue_state *state;
415         struct tevent_queue_entry *e;
416         size_t i;
417         ssize_t buflen;
418
419         req = tevent_req_create(out, &state,
420                                 struct messaging_dgm_out_queue_state);
421         if (req == NULL) {
422                 return NULL;
423         }
424         state->ev = ev;
425         state->pool = out->ctx->pool;
426         state->sock = out->sock;
427         state->req = req;
428
429         /*
430          * Go blocking in a thread
431          */
432         if (!out->is_blocking) {
433                 int ret = set_blocking(out->sock, true);
434                 if (ret == -1) {
435                         tevent_req_error(req, errno);
436                         return tevent_req_post(req, ev);
437                 }
438                 out->is_blocking = true;
439         }
440
441         buflen = iov_buflen(iov, iovlen);
442         if (buflen == -1) {
443                 tevent_req_error(req, EMSGSIZE);
444                 return tevent_req_post(req, ev);
445         }
446
447         state->buf = talloc_array(state, uint8_t, buflen);
448         if (tevent_req_nomem(state->buf, req)) {
449                 return tevent_req_post(req, ev);
450         }
451         iov_buf(iov, iovlen, state->buf, buflen);
452
453         state->fds = talloc_array(state, int, num_fds);
454         if (tevent_req_nomem(state->fds, req)) {
455                 return tevent_req_post(req, ev);
456         }
457
458         for (i=0; i<num_fds; i++) {
459                 state->fds[i] = -1;
460         }
461
462         for (i=0; i<num_fds; i++) {
463
464                 state->fds[i] = dup(fds[i]);
465
466                 if (state->fds[i] == -1) {
467                         int ret = errno;
468
469                         close_fd_array(state->fds, num_fds);
470
471                         tevent_req_error(req, ret);
472                         return tevent_req_post(req, ev);
473                 }
474         }
475
476         talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
477
478         e = tevent_queue_add_entry(out->queue, ev, req,
479                                    messaging_dgm_out_queue_trigger, req);
480         if (tevent_req_nomem(e, req)) {
481                 return tevent_req_post(req, ev);
482         }
483         return req;
484 }
485
486 static int messaging_dgm_out_queue_state_destructor(
487         struct messaging_dgm_out_queue_state *state)
488 {
489         int *fds;
490         size_t num_fds;
491
492         if (state->subreq != NULL) {
493                 /*
494                  * We're scheduled, but we're destroyed. This happens
495                  * if the messaging_dgm_context is destroyed while
496                  * we're stuck in a blocking send. There's nothing we
497                  * can do but to leak memory.
498                  */
499                 TALLOC_FREE(state->subreq);
500                 (void)talloc_reparent(state->req, NULL, state);
501                 return -1;
502         }
503
504         fds = state->fds;
505         num_fds = talloc_array_length(fds);
506         close_fd_array(fds, num_fds);
507         return 0;
508 }
509
510 /*
511  * tevent_queue callback that schedules the pthreadpool to actually
512  * send the queued message fragment.
513  */
514
515 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
516                                            void *private_data)
517 {
518         struct messaging_dgm_out_queue_state *state = tevent_req_data(
519                 req, struct messaging_dgm_out_queue_state);
520
521         tevent_req_reset_endtime(req);
522
523         state->subreq = pthreadpool_tevent_job_send(
524                 state, state->ev, state->pool,
525                 messaging_dgm_out_threaded_job, state);
526         if (tevent_req_nomem(state->subreq, req)) {
527                 return;
528         }
529         tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
530                                 req);
531 }
532
533 /*
534  * Wrapper function run by the pthread that calls
535  * messaging_dgm_sendmsg() to actually do the sendmsg().
536  */
537
538 static void messaging_dgm_out_threaded_job(void *private_data)
539 {
540         struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
541                 private_data, struct messaging_dgm_out_queue_state);
542
543         struct iovec iov = { .iov_base = state->buf,
544                              .iov_len = talloc_get_size(state->buf) };
545         size_t num_fds = talloc_array_length(state->fds);
546         int msec = 1;
547
548         while (true) {
549                 int ret;
550
551                 state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
552                                             state->fds, num_fds, &state->err);
553
554                 if (state->sent != -1) {
555                         return;
556                 }
557                 if (state->err != ENOBUFS) {
558                         return;
559                 }
560
561                 /*
562                  * ENOBUFS is the FreeBSD way of saying "Try
563                  * again". We have to do polling.
564                  */
565                 do {
566                         ret = poll(NULL, 0, msec);
567                 } while ((ret == -1) && (errno == EINTR));
568
569                 /*
570                  * Exponential backoff up to once a second
571                  */
572                 msec *= 2;
573                 msec = MIN(msec, 1000);
574         }
575 }
576
577 /*
578  * Pickup the results of the pthread sendmsg().
579  */
580
581 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
582 {
583         struct tevent_req *req = tevent_req_callback_data(
584                 subreq, struct tevent_req);
585         struct messaging_dgm_out_queue_state *state = tevent_req_data(
586                 req, struct messaging_dgm_out_queue_state);
587         int ret;
588
589         if (subreq != state->subreq) {
590                 abort();
591         }
592
593         ret = pthreadpool_tevent_job_recv(subreq);
594
595         TALLOC_FREE(subreq);
596         state->subreq = NULL;
597
598         if (tevent_req_error(req, ret)) {
599                 return;
600         }
601         if (state->sent == -1) {
602                 tevent_req_error(req, state->err);
603                 return;
604         }
605         tevent_req_done(req);
606 }
607
608 static int messaging_dgm_out_queue_recv(struct tevent_req *req)
609 {
610         return tevent_req_simple_recv_unix(req);
611 }
612
613 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
614
615 /*
616  * Core function to send a message fragment given a
617  * connected struct messaging_dgm_out * destination.
618  * If no current queue tries to send nonblocking
619  * directly. If not, queues the fragment (which makes
620  * a copy of it) and adds a 60-second timeout on the send.
621  */
622
623 static int messaging_dgm_out_send_fragment(
624         struct tevent_context *ev, struct messaging_dgm_out *out,
625         const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
626 {
627         struct tevent_req *req;
628         size_t qlen;
629         bool ok;
630
631         qlen = tevent_queue_length(out->queue);
632         if (qlen == 0) {
633                 ssize_t nsent;
634                 int err = 0;
635
636                 if (out->is_blocking) {
637                         int ret = set_blocking(out->sock, false);
638                         if (ret == -1) {
639                                 return errno;
640                         }
641                         out->is_blocking = false;
642                 }
643
644                 nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
645                                               num_fds, &err);
646                 if (nsent >= 0) {
647                         return 0;
648                 }
649
650                 if (err == ENOBUFS) {
651                         /*
652                          * FreeBSD's way of telling us the dst socket
653                          * is full. EWOULDBLOCK makes us spawn a
654                          * polling helper thread.
655                          */
656                         err = EWOULDBLOCK;
657                 }
658
659                 if (err != EWOULDBLOCK) {
660                         return err;
661                 }
662         }
663
664         req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
665                                            fds, num_fds);
666         if (req == NULL) {
667                 return ENOMEM;
668         }
669         tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
670
671         ok = tevent_req_set_endtime(req, ev,
672                                     tevent_timeval_current_ofs(60, 0));
673         if (!ok) {
674                 TALLOC_FREE(req);
675                 return ENOMEM;
676         }
677
678         return 0;
679 }
680
681 /*
682  * Pickup the result of the fragment send. Reset idle timer
683  * if queue empty.
684  */
685
686 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
687 {
688         struct messaging_dgm_out *out = tevent_req_callback_data(
689                 req, struct messaging_dgm_out);
690         int ret;
691
692         ret = messaging_dgm_out_queue_recv(req);
693         TALLOC_FREE(req);
694
695         if (ret != 0) {
696                 DBG_WARNING("messaging_out_queue_recv returned %s\n",
697                             strerror(ret));
698         }
699
700         messaging_dgm_out_rearm_idle_timer(out);
701 }
702
703
704 struct messaging_dgm_fragment_hdr {
705         size_t msglen;
706         pid_t pid;
707         int sock;
708 };
709
710 /*
711  * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
712  * size chunks and send it.
713  *
714  * Message fragments are prefixed by a 64-bit cookie that
715  * stays the same for all fragments. This allows the receiver
716  * to recognise fragments of the same message and re-assemble
717  * them on the other end.
718  *
719  * Note that this allows other message fragments from other
720  * senders to be interleaved in the receive read processing,
721  * the combination of the cookie and header info allows unique
722  * identification of the message from a specific sender in
723  * re-assembly.
724  *
725  * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
726  * then send a single message with cookie set to zero.
727  *
728  * Otherwise the message is fragmented into chunks and added
729  * to the sending queue. Any file descriptors are passed only
730  * in the last fragment.
731  *
732  * Finally the cookie is incremented (wrap over zero) to
733  * prepare for the next message sent to this channel.
734  *
735  */
736
737 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
738                                              struct messaging_dgm_out *out,
739                                              const struct iovec *iov,
740                                              int iovlen,
741                                              const int *fds, size_t num_fds)
742 {
743         ssize_t msglen, sent;
744         int ret = 0;
745         struct iovec iov_copy[iovlen+2];
746         struct messaging_dgm_fragment_hdr hdr;
747         struct iovec src_iov;
748
749         if (iovlen < 0) {
750                 return EINVAL;
751         }
752
753         msglen = iov_buflen(iov, iovlen);
754         if (msglen == -1) {
755                 return EMSGSIZE;
756         }
757         if (num_fds > INT8_MAX) {
758                 return EINVAL;
759         }
760
761         if ((size_t) msglen <=
762             (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
763                 uint64_t cookie = 0;
764
765                 iov_copy[0].iov_base = &cookie;
766                 iov_copy[0].iov_len = sizeof(cookie);
767                 if (iovlen > 0) {
768                         memcpy(&iov_copy[1], iov,
769                                sizeof(struct iovec) * iovlen);
770                 }
771
772                 return messaging_dgm_out_send_fragment(
773                         ev, out, iov_copy, iovlen+1, fds, num_fds);
774
775         }
776
777         hdr = (struct messaging_dgm_fragment_hdr) {
778                 .msglen = msglen,
779                 .pid = getpid(),
780                 .sock = out->sock
781         };
782
783         iov_copy[0].iov_base = &out->cookie;
784         iov_copy[0].iov_len = sizeof(out->cookie);
785         iov_copy[1].iov_base = &hdr;
786         iov_copy[1].iov_len = sizeof(hdr);
787
788         sent = 0;
789         src_iov = iov[0];
790
791         /*
792          * The following write loop sends the user message in pieces. We have
793          * filled the first two iovecs above with "cookie" and "hdr". In the
794          * following loops we pull message chunks from the user iov array and
795          * fill iov_copy piece by piece, possibly truncating chunks from the
796          * caller's iov array. Ugly, but hopefully efficient.
797          */
798
799         while (sent < msglen) {
800                 size_t fragment_len;
801                 size_t iov_index = 2;
802
803                 fragment_len = sizeof(out->cookie) + sizeof(hdr);
804
805                 while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
806                         size_t space, chunk;
807
808                         space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
809                         chunk = MIN(space, src_iov.iov_len);
810
811                         iov_copy[iov_index].iov_base = src_iov.iov_base;
812                         iov_copy[iov_index].iov_len = chunk;
813                         iov_index += 1;
814
815                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
816                         src_iov.iov_len -= chunk;
817                         fragment_len += chunk;
818
819                         if (src_iov.iov_len == 0) {
820                                 iov += 1;
821                                 iovlen -= 1;
822                                 if (iovlen == 0) {
823                                         break;
824                                 }
825                                 src_iov = iov[0];
826                         }
827                 }
828                 sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
829
830                 /*
831                  * only the last fragment should pass the fd array.
832                  * That simplifies the receiver a lot.
833                  */
834                 if (sent < msglen) {
835                         ret = messaging_dgm_out_send_fragment(
836                                 ev, out, iov_copy, iov_index, NULL, 0);
837                 } else {
838                         ret = messaging_dgm_out_send_fragment(
839                                 ev, out, iov_copy, iov_index, fds, num_fds);
840                 }
841                 if (ret != 0) {
842                         break;
843                 }
844         }
845
846         out->cookie += 1;
847         if (out->cookie == 0) {
848                 out->cookie += 1;
849         }
850
851         return ret;
852 }
853
854 static struct messaging_dgm_context *global_dgm_context;
855
856 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
857
858 static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
859                                          pid_t pid, int *plockfile_fd,
860                                          uint64_t *punique)
861 {
862         char buf[64];
863         int lockfile_fd;
864         struct sun_path_buf lockfile_name;
865         struct flock lck;
866         uint64_t unique;
867         int unique_len, ret;
868         ssize_t written;
869
870         ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
871                        "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
872         if (ret < 0) {
873                 return errno;
874         }
875         if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
876                 return ENAMETOOLONG;
877         }
878
879         /* no O_EXCL, existence check is via the fcntl lock */
880
881         lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
882                            0644);
883
884         if ((lockfile_fd == -1) &&
885             ((errno == ENXIO) /* Linux */ ||
886              (errno == ENODEV) /* Linux kernel bug */ ||
887              (errno == EOPNOTSUPP) /* FreeBSD */)) {
888                 /*
889                  * Huh -- a socket? This might be a stale socket from
890                  * an upgrade of Samba. Just unlink and retry, nobody
891                  * else is supposed to be here at this time.
892                  *
893                  * Yes, this is racy, but I don't see a way to deal
894                  * with this properly.
895                  */
896                 unlink(lockfile_name.buf);
897
898                 lockfile_fd = open(lockfile_name.buf,
899                                    O_NONBLOCK|O_CREAT|O_WRONLY,
900                                    0644);
901         }
902
903         if (lockfile_fd == -1) {
904                 ret = errno;
905                 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
906                 return ret;
907         }
908
909         lck = (struct flock) {
910                 .l_type = F_WRLCK,
911                 .l_whence = SEEK_SET
912         };
913
914         ret = fcntl(lockfile_fd, F_SETLK, &lck);
915         if (ret == -1) {
916                 ret = errno;
917                 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
918                 goto fail_close;
919         }
920
921         /*
922          * Directly using the binary value for
923          * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
924          * violation. But including all of ndr here just for this
925          * seems to be a bit overkill to me. Also, messages_dgm might
926          * be replaced sooner or later by something streams-based,
927          * where unique_id generation will be handled differently.
928          */
929
930         do {
931                 generate_random_buffer((uint8_t *)&unique, sizeof(unique));
932         } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
933
934         unique_len = snprintf(buf, sizeof(buf), "%ju\n", (uintmax_t)unique);
935
936         /* shorten a potentially preexisting file */
937
938         ret = ftruncate(lockfile_fd, unique_len);
939         if (ret == -1) {
940                 ret = errno;
941                 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
942                           strerror(ret)));
943                 goto fail_unlink;
944         }
945
946         written = write(lockfile_fd, buf, unique_len);
947         if (written != unique_len) {
948                 ret = errno;
949                 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
950                 goto fail_unlink;
951         }
952
953         *plockfile_fd = lockfile_fd;
954         *punique = unique;
955         return 0;
956
957 fail_unlink:
958         unlink(lockfile_name.buf);
959 fail_close:
960         close(lockfile_fd);
961         return ret;
962 }
963
964 static void messaging_dgm_read_handler(struct tevent_context *ev,
965                                        struct tevent_fd *fde,
966                                        uint16_t flags,
967                                        void *private_data);
968
969 /*
970  * Create the rendezvous point in the file system
971  * that other processes can use to send messages to
972  * this pid.
973  */
974
975 int messaging_dgm_init(struct tevent_context *ev,
976                        uint64_t *punique,
977                        const char *socket_dir,
978                        const char *lockfile_dir,
979                        void (*recv_cb)(struct tevent_context *ev,
980                                        const uint8_t *msg,
981                                        size_t msg_len,
982                                        int *fds,
983                                        size_t num_fds,
984                                        void *private_data),
985                        void *recv_cb_private_data)
986 {
987         struct messaging_dgm_context *ctx;
988         int ret;
989         struct sockaddr_un socket_address;
990         size_t len;
991         static bool have_dgm_context = false;
992
993         if (have_dgm_context) {
994                 return EEXIST;
995         }
996
997         ctx = talloc_zero(NULL, struct messaging_dgm_context);
998         if (ctx == NULL) {
999                 goto fail_nomem;
1000         }
1001         ctx->ev = ev;
1002         ctx->pid = getpid();
1003         ctx->recv_cb = recv_cb;
1004         ctx->recv_cb_private_data = recv_cb_private_data;
1005
1006         len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1007                       sizeof(ctx->lockfile_dir.buf));
1008         if (len >= sizeof(ctx->lockfile_dir.buf)) {
1009                 TALLOC_FREE(ctx);
1010                 return ENAMETOOLONG;
1011         }
1012
1013         len = strlcpy(ctx->socket_dir.buf, socket_dir,
1014                       sizeof(ctx->socket_dir.buf));
1015         if (len >= sizeof(ctx->socket_dir.buf)) {
1016                 TALLOC_FREE(ctx);
1017                 return ENAMETOOLONG;
1018         }
1019
1020         socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1021         len = snprintf(socket_address.sun_path,
1022                        sizeof(socket_address.sun_path),
1023                        "%s/%u", socket_dir, (unsigned)ctx->pid);
1024         if (len >= sizeof(socket_address.sun_path)) {
1025                 TALLOC_FREE(ctx);
1026                 return ENAMETOOLONG;
1027         }
1028
1029         ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1030                                             punique);
1031         if (ret != 0) {
1032                 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1033                           __func__, strerror(ret)));
1034                 TALLOC_FREE(ctx);
1035                 return ret;
1036         }
1037
1038         unlink(socket_address.sun_path);
1039
1040         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1041         if (ctx->sock == -1) {
1042                 ret = errno;
1043                 DBG_WARNING("socket failed: %s\n", strerror(ret));
1044                 TALLOC_FREE(ctx);
1045                 return ret;
1046         }
1047
1048         ret = prepare_socket_cloexec(ctx->sock);
1049         if (ret == -1) {
1050                 ret = errno;
1051                 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1052                             strerror(ret));
1053                 TALLOC_FREE(ctx);
1054                 return ret;
1055         }
1056
1057         ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1058                    sizeof(socket_address));
1059         if (ret == -1) {
1060                 ret = errno;
1061                 DBG_WARNING("bind failed: %s\n", strerror(ret));
1062                 TALLOC_FREE(ctx);
1063                 return ret;
1064         }
1065
1066         talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1067
1068         ctx->have_dgm_context = &have_dgm_context;
1069
1070         ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1071         if (ret != 0) {
1072                 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1073                             strerror(ret));
1074                 TALLOC_FREE(ctx);
1075                 return ret;
1076         }
1077
1078         global_dgm_context = ctx;
1079         return 0;
1080
1081 fail_nomem:
1082         TALLOC_FREE(ctx);
1083         return ENOMEM;
1084 }
1085
1086 /*
1087  * Remove the rendezvous point in the filesystem
1088  * if we're the owner.
1089  */
1090
1091 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1092 {
1093         while (c->outsocks != NULL) {
1094                 TALLOC_FREE(c->outsocks);
1095         }
1096         while (c->in_msgs != NULL) {
1097                 TALLOC_FREE(c->in_msgs);
1098         }
1099         while (c->fde_evs != NULL) {
1100                 tevent_fd_set_flags(c->fde_evs->fde, 0);
1101                 c->fde_evs->ctx = NULL;
1102                 DLIST_REMOVE(c->fde_evs, c->fde_evs);
1103         }
1104
1105         close(c->sock);
1106
1107         if (getpid() == c->pid) {
1108                 struct sun_path_buf name;
1109                 int ret;
1110
1111                 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1112                                c->socket_dir.buf, (unsigned)c->pid);
1113                 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1114                         /*
1115                          * We've checked the length when creating, so this
1116                          * should never happen
1117                          */
1118                         abort();
1119                 }
1120                 unlink(name.buf);
1121
1122                 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1123                                c->lockfile_dir.buf, (unsigned)c->pid);
1124                 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1125                         /*
1126                          * We've checked the length when creating, so this
1127                          * should never happen
1128                          */
1129                         abort();
1130                 }
1131                 unlink(name.buf);
1132         }
1133         close(c->lockfile_fd);
1134
1135         if (c->have_dgm_context != NULL) {
1136                 *c->have_dgm_context = false;
1137         }
1138
1139         return 0;
1140 }
1141
1142 static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1143 {
1144 #ifdef DEVELOPER
1145         pid_t pid = getpid();
1146         struct sockaddr_storage addr;
1147         socklen_t addrlen = sizeof(addr);
1148         struct sockaddr_un *un_addr;
1149         struct sun_path_buf pathbuf;
1150         struct stat st1, st2;
1151         int ret;
1152
1153         /*
1154          * Protect against using the wrong messaging context after a
1155          * fork without reinit_after_fork.
1156          */
1157
1158         ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1159         if (ret == -1) {
1160                 DBG_ERR("getsockname failed: %s\n", strerror(errno));
1161                 goto fail;
1162         }
1163         if (addr.ss_family != AF_UNIX) {
1164                 DBG_ERR("getsockname returned family %d\n",
1165                         (int)addr.ss_family);
1166                 goto fail;
1167         }
1168         un_addr = (struct sockaddr_un *)&addr;
1169
1170         ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1171                        "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1172         if (ret < 0) {
1173                 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1174                 goto fail;
1175         }
1176         if ((size_t)ret >= sizeof(pathbuf.buf)) {
1177                 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1178                 goto fail;
1179         }
1180
1181         if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1182                 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1183                         pathbuf.buf, un_addr->sun_path);
1184                 goto fail;
1185         }
1186
1187         ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1188                        "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1189         if (ret < 0) {
1190                 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1191                 goto fail;
1192         }
1193         if ((size_t)ret >= sizeof(pathbuf.buf)) {
1194                 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1195                 goto fail;
1196         }
1197
1198         ret = stat(pathbuf.buf, &st1);
1199         if (ret == -1) {
1200                 DBG_ERR("stat failed: %s\n", strerror(errno));
1201                 goto fail;
1202         }
1203         ret = fstat(ctx->lockfile_fd, &st2);
1204         if (ret == -1) {
1205                 DBG_ERR("fstat failed: %s\n", strerror(errno));
1206                 goto fail;
1207         }
1208
1209         if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1210                 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1211                         (int)st2.st_dev, (int)st2.st_ino,
1212                         (int)st1.st_dev, (int)st1.st_ino);
1213                 goto fail;
1214         }
1215
1216         return;
1217 fail:
1218         abort();
1219 #else
1220         return;
1221 #endif
1222 }
1223
1224 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1225                                struct tevent_context *ev,
1226                                uint8_t *msg, size_t msg_len,
1227                                int *fds, size_t num_fds);
1228
1229 /*
1230  * Raw read callback handler - passes to messaging_dgm_recv()
1231  * for fragment reassembly processing.
1232  */
1233
1234 static void messaging_dgm_read_handler(struct tevent_context *ev,
1235                                        struct tevent_fd *fde,
1236                                        uint16_t flags,
1237                                        void *private_data)
1238 {
1239         struct messaging_dgm_context *ctx = talloc_get_type_abort(
1240                 private_data, struct messaging_dgm_context);
1241         ssize_t received;
1242         struct msghdr msg;
1243         struct iovec iov;
1244         size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1245         uint8_t msgbuf[msgbufsize];
1246         uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1247         size_t num_fds;
1248
1249         messaging_dgm_validate(ctx);
1250
1251         if ((flags & TEVENT_FD_READ) == 0) {
1252                 return;
1253         }
1254
1255         iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1256         msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1257
1258         msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1259
1260 #ifdef MSG_CMSG_CLOEXEC
1261         msg.msg_flags |= MSG_CMSG_CLOEXEC;
1262 #endif
1263
1264         received = recvmsg(ctx->sock, &msg, 0);
1265         if (received == -1) {
1266                 if ((errno == EAGAIN) ||
1267                     (errno == EWOULDBLOCK) ||
1268                     (errno == EINTR) ||
1269                     (errno == ENOMEM)) {
1270                         /* Not really an error - just try again. */
1271                         return;
1272                 }
1273                 /* Problem with the socket. Set it unreadable. */
1274                 tevent_fd_set_flags(fde, 0);
1275                 return;
1276         }
1277
1278         if ((size_t)received > sizeof(buf)) {
1279                 /* More than we expected, not for us */
1280                 return;
1281         }
1282
1283         num_fds = msghdr_extract_fds(&msg, NULL, 0);
1284         if (num_fds == 0) {
1285                 int fds[1];
1286
1287                 messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1288         } else {
1289                 size_t i;
1290                 int fds[num_fds];
1291
1292                 msghdr_extract_fds(&msg, fds, num_fds);
1293
1294                 for (i = 0; i < num_fds; i++) {
1295                         int err;
1296
1297                         err = prepare_socket_cloexec(fds[i]);
1298                         if (err != 0) {
1299                                 close_fd_array(fds, num_fds);
1300                                 num_fds = 0;
1301                         }
1302                 }
1303
1304                 messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1305         }
1306 }
1307
1308 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1309 {
1310         DLIST_REMOVE(m->ctx->in_msgs, m);
1311         return 0;
1312 }
1313
1314 /*
1315  * Deal with identification of fragmented messages and
1316  * re-assembly into full messages sent, then calls the
1317  * callback.
1318  */
1319
1320 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1321                                struct tevent_context *ev,
1322                                uint8_t *buf, size_t buflen,
1323                                int *fds, size_t num_fds)
1324 {
1325         struct messaging_dgm_fragment_hdr hdr;
1326         struct messaging_dgm_in_msg *msg;
1327         size_t space;
1328         uint64_t cookie;
1329
1330         if (buflen < sizeof(cookie)) {
1331                 goto close_fds;
1332         }
1333         memcpy(&cookie, buf, sizeof(cookie));
1334         buf += sizeof(cookie);
1335         buflen -= sizeof(cookie);
1336
1337         if (cookie == 0) {
1338                 ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1339                              ctx->recv_cb_private_data);
1340                 return;
1341         }
1342
1343         if (buflen < sizeof(hdr)) {
1344                 goto close_fds;
1345         }
1346         memcpy(&hdr, buf, sizeof(hdr));
1347         buf += sizeof(hdr);
1348         buflen -= sizeof(hdr);
1349
1350         for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1351                 if ((msg->sender_pid == hdr.pid) &&
1352                     (msg->sender_sock == hdr.sock)) {
1353                         break;
1354                 }
1355         }
1356
1357         if ((msg != NULL) && (msg->cookie != cookie)) {
1358                 TALLOC_FREE(msg);
1359         }
1360
1361         if (msg == NULL) {
1362                 size_t msglen;
1363                 msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1364                         hdr.msglen;
1365
1366                 msg = talloc_size(ctx, msglen);
1367                 if (msg == NULL) {
1368                         goto close_fds;
1369                 }
1370                 talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1371
1372                 *msg = (struct messaging_dgm_in_msg) {
1373                         .ctx = ctx, .msglen = hdr.msglen,
1374                         .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1375                         .cookie = cookie
1376                 };
1377                 DLIST_ADD(ctx->in_msgs, msg);
1378                 talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1379         }
1380
1381         space = msg->msglen - msg->received;
1382         if (buflen > space) {
1383                 goto close_fds;
1384         }
1385
1386         memcpy(msg->buf + msg->received, buf, buflen);
1387         msg->received += buflen;
1388
1389         if (msg->received < msg->msglen) {
1390                 /*
1391                  * Any valid sender will send the fds in the last
1392                  * block. Invalid senders might have sent fd's that we
1393                  * need to close here.
1394                  */
1395                 goto close_fds;
1396         }
1397
1398         DLIST_REMOVE(ctx->in_msgs, msg);
1399         talloc_set_destructor(msg, NULL);
1400
1401         ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1402                      ctx->recv_cb_private_data);
1403
1404         TALLOC_FREE(msg);
1405         return;
1406
1407 close_fds:
1408         close_fd_array(fds, num_fds);
1409 }
1410
1411 void messaging_dgm_destroy(void)
1412 {
1413         TALLOC_FREE(global_dgm_context);
1414 }
1415
1416 int messaging_dgm_send(pid_t pid,
1417                        const struct iovec *iov, int iovlen,
1418                        const int *fds, size_t num_fds)
1419 {
1420         struct messaging_dgm_context *ctx = global_dgm_context;
1421         struct messaging_dgm_out *out;
1422         int ret;
1423         unsigned retries = 0;
1424
1425         if (ctx == NULL) {
1426                 return ENOTCONN;
1427         }
1428
1429         messaging_dgm_validate(ctx);
1430
1431 again:
1432         ret = messaging_dgm_out_get(ctx, pid, &out);
1433         if (ret != 0) {
1434                 return ret;
1435         }
1436
1437         DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1438
1439         ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1440                                                 fds, num_fds);
1441         if (ret == ECONNREFUSED) {
1442                 /*
1443                  * We cache outgoing sockets. If the receiver has
1444                  * closed and re-opened the socket since our last
1445                  * message, we get connection refused. Retry.
1446                  */
1447
1448                 TALLOC_FREE(out);
1449
1450                 if (retries < 5) {
1451                         retries += 1;
1452                         goto again;
1453                 }
1454         }
1455         return ret;
1456 }
1457
1458 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1459 {
1460         char buf[25];
1461         ssize_t rw_ret;
1462         int error = 0;
1463         unsigned long long unique;
1464         char *endptr;
1465
1466         rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1467         if (rw_ret == -1) {
1468                 return errno;
1469         }
1470         buf[rw_ret] = '\0';
1471
1472         unique = strtoull_err(buf, &endptr, 10, &error);
1473         if (error != 0) {
1474                 return error;
1475         }
1476
1477         if (endptr[0] != '\n') {
1478                 return EINVAL;
1479         }
1480         *punique = unique;
1481         return 0;
1482 }
1483
1484 int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1485 {
1486         struct messaging_dgm_context *ctx = global_dgm_context;
1487         struct sun_path_buf lockfile_name;
1488         int ret, fd;
1489
1490         if (ctx == NULL) {
1491                 return EBADF;
1492         }
1493
1494         messaging_dgm_validate(ctx);
1495
1496         if (pid == getpid()) {
1497                 /*
1498                  * Protect against losing our own lock
1499                  */
1500                 return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1501         }
1502
1503         ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1504                        "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1505         if (ret < 0) {
1506                 return errno;
1507         }
1508         if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1509                 return ENAMETOOLONG;
1510         }
1511
1512         fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1513         if (fd == -1) {
1514                 return errno;
1515         }
1516
1517         ret = messaging_dgm_read_unique(fd, unique);
1518         close(fd);
1519         return ret;
1520 }
1521
1522 int messaging_dgm_cleanup(pid_t pid)
1523 {
1524         struct messaging_dgm_context *ctx = global_dgm_context;
1525         struct sun_path_buf lockfile_name, socket_name;
1526         int fd, len, ret;
1527         struct flock lck = {};
1528
1529         if (ctx == NULL) {
1530                 return ENOTCONN;
1531         }
1532
1533         len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1534                        ctx->socket_dir.buf, (unsigned)pid);
1535         if (len < 0) {
1536                 return errno;
1537         }
1538         if ((size_t)len >= sizeof(socket_name.buf)) {
1539                 return ENAMETOOLONG;
1540         }
1541
1542         len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1543                        ctx->lockfile_dir.buf, (unsigned)pid);
1544         if (len < 0) {
1545                 return errno;
1546         }
1547         if ((size_t)len >= sizeof(lockfile_name.buf)) {
1548                 return ENAMETOOLONG;
1549         }
1550
1551         fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1552         if (fd == -1) {
1553                 ret = errno;
1554                 if (ret != ENOENT) {
1555                         DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1556                                    lockfile_name.buf, strerror(ret)));
1557                 }
1558                 return ret;
1559         }
1560
1561         lck.l_type = F_WRLCK;
1562         lck.l_whence = SEEK_SET;
1563         lck.l_start = 0;
1564         lck.l_len = 0;
1565
1566         ret = fcntl(fd, F_SETLK, &lck);
1567         if (ret != 0) {
1568                 ret = errno;
1569                 if ((ret != EACCES) && (ret != EAGAIN)) {
1570                         DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1571                                    strerror(ret)));
1572                 }
1573                 close(fd);
1574                 return ret;
1575         }
1576
1577         DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1578
1579         (void)unlink(socket_name.buf);
1580         (void)unlink(lockfile_name.buf);
1581         (void)close(fd);
1582         return 0;
1583 }
1584
1585 static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1586 {
1587         pid_t *our_pid = (pid_t *)private_data;
1588         int ret;
1589
1590         if (pid == *our_pid) {
1591                 /*
1592                  * fcntl(F_GETLK) will succeed for ourselves, we hold
1593                  * that lock ourselves.
1594                  */
1595                 return 0;
1596         }
1597
1598         ret = messaging_dgm_cleanup(pid);
1599         DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1600                    (unsigned long)pid, ret ? strerror(ret) : "ok"));
1601
1602         return 0;
1603 }
1604
1605 int messaging_dgm_wipe(void)
1606 {
1607         pid_t pid = getpid();
1608         messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1609         return 0;
1610 }
1611
1612 int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1613                          void *private_data)
1614 {
1615         struct messaging_dgm_context *ctx = global_dgm_context;
1616         DIR *msgdir;
1617         struct dirent *dp;
1618         int error = 0;
1619
1620         if (ctx == NULL) {
1621                 return ENOTCONN;
1622         }
1623
1624         messaging_dgm_validate(ctx);
1625
1626         /*
1627          * We scan the socket directory and not the lock directory. Otherwise
1628          * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1629          * and fcntl(SETLK).
1630          */
1631
1632         msgdir = opendir(ctx->socket_dir.buf);
1633         if (msgdir == NULL) {
1634                 return errno;
1635         }
1636
1637         while ((dp = readdir(msgdir)) != NULL) {
1638                 unsigned long pid;
1639                 int ret;
1640
1641                 pid = strtoul_err(dp->d_name, NULL, 10, &error);
1642                 if ((pid == 0) || (error != 0)) {
1643                         /*
1644                          * . and .. and other malformed entries
1645                          */
1646                         continue;
1647                 }
1648
1649                 ret = fn(pid, private_data);
1650                 if (ret != 0) {
1651                         break;
1652                 }
1653         }
1654         closedir(msgdir);
1655
1656         return 0;
1657 }
1658
1659 struct messaging_dgm_fde {
1660         struct tevent_fd *fde;
1661 };
1662
1663 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1664 {
1665         if (fde_ev->ctx != NULL) {
1666                 DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1667                 fde_ev->ctx = NULL;
1668         }
1669         return 0;
1670 }
1671
1672 /*
1673  * Reference counter for a struct tevent_fd messaging read event
1674  * (with callback function) on a struct tevent_context registered
1675  * on a messaging context.
1676  *
1677  * If we've already registered this struct tevent_context before
1678  * (so already have a read event), just increase the reference count.
1679  *
1680  * Otherwise create a new struct tevent_fd messaging read event on the
1681  * previously unseen struct tevent_context - this is what drives
1682  * the message receive processing.
1683  *
1684  */
1685
1686 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1687         TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1688 {
1689         struct messaging_dgm_context *ctx = global_dgm_context;
1690         struct messaging_dgm_fde_ev *fde_ev;
1691         struct messaging_dgm_fde *fde;
1692
1693         if (ctx == NULL) {
1694                 return NULL;
1695         }
1696
1697         fde = talloc(mem_ctx, struct messaging_dgm_fde);
1698         if (fde == NULL) {
1699                 return NULL;
1700         }
1701
1702         for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1703                 if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1704                         /*
1705                          * If the event context got deleted,
1706                          * tevent_fd_get_flags() will return 0
1707                          * for the stale fde.
1708                          *
1709                          * In that case we should not
1710                          * use fde_ev->ev anymore.
1711                          */
1712                         continue;
1713                 }
1714                 if (fde_ev->ev == ev) {
1715                         break;
1716                 }
1717         }
1718
1719         if (fde_ev == NULL) {
1720                 fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1721                 if (fde_ev == NULL) {
1722                         return NULL;
1723                 }
1724                 fde_ev->fde = tevent_add_fd(
1725                         ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1726                         messaging_dgm_read_handler, ctx);
1727                 if (fde_ev->fde == NULL) {
1728                         TALLOC_FREE(fde);
1729                         return NULL;
1730                 }
1731                 fde_ev->ev = ev;
1732                 fde_ev->ctx = ctx;
1733                 DLIST_ADD(ctx->fde_evs, fde_ev);
1734                 talloc_set_destructor(
1735                         fde_ev, messaging_dgm_fde_ev_destructor);
1736         } else {
1737                 /*
1738                  * Same trick as with tdb_wrap: The caller will never
1739                  * see the talloc_referenced object, the
1740                  * messaging_dgm_fde_ev, so problems with
1741                  * talloc_unlink will not happen.
1742                  */
1743                 if (talloc_reference(fde, fde_ev) == NULL) {
1744                         TALLOC_FREE(fde);
1745                         return NULL;
1746                 }
1747         }
1748
1749         fde->fde = fde_ev->fde;
1750         return fde;
1751 }
1752
1753 bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1754 {
1755         uint16_t flags;
1756
1757         if (fde == NULL) {
1758                 return false;
1759         }
1760         flags = tevent_fd_get_flags(fde->fde);
1761         return (flags != 0);
1762 }