2 * Unix SMB/CIFS implementation.
3 * Samba internal messaging functions
4 * Copyright (C) 2013 by Volker Lendecke
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "util/util.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/select.h"
26 #include "lib/util/debug.h"
27 #include "lib/messages_dgm.h"
28 #include "lib/util/genrand.h"
29 #include "lib/util/dlinklist.h"
30 #include "lib/pthreadpool/pthreadpool_tevent.h"
31 #include "lib/util/msghdr.h"
32 #include "lib/util/iov_buf.h"
33 #include "lib/util/blocking.h"
34 #include "lib/util/tevent_unix.h"
36 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
40 * This will carry enough for a socket path
42 char buf[sizeof(struct sockaddr_un)];
46 * We can only have one tevent_fd per dgm_context and per
47 * tevent_context. Maintain a list of registered tevent_contexts per
50 struct messaging_dgm_fde_ev {
51 struct messaging_dgm_fde_ev *prev, *next;
54 * Backreference to enable DLIST_REMOVE from our
55 * destructor. Also, set to NULL when the dgm_context dies
56 * before the messaging_dgm_fde_ev.
58 struct messaging_dgm_context *ctx;
60 struct tevent_context *ev;
61 struct tevent_fd *fde;
64 struct messaging_dgm_out {
65 struct messaging_dgm_out *prev, *next;
66 struct messaging_dgm_context *ctx;
73 struct tevent_queue *queue;
74 struct tevent_timer *idle_timer;
77 struct messaging_dgm_in_msg {
78 struct messaging_dgm_in_msg *prev, *next;
79 struct messaging_dgm_context *ctx;
88 struct messaging_dgm_context {
89 struct tevent_context *ev;
91 struct sun_path_buf socket_dir;
92 struct sun_path_buf lockfile_dir;
96 struct messaging_dgm_in_msg *in_msgs;
98 struct messaging_dgm_fde_ev *fde_evs;
99 void (*recv_cb)(struct tevent_context *ev,
105 void *recv_cb_private_data;
107 bool *have_dgm_context;
109 struct pthreadpool_tevent *pool;
110 struct messaging_dgm_out *outsocks;
113 /* Set socket close on exec. */
114 static int prepare_socket_cloexec(int sock)
119 flags = fcntl(sock, F_GETFD, 0);
124 if (fcntl(sock, F_SETFD, flags) == -1) {
131 static void close_fd_array(int *fds, size_t num_fds)
135 for (i = 0; i < num_fds; i++) {
146 * The idle handler can free the struct messaging_dgm_out *,
147 * if it's unused (qlen of zero) which closes the socket.
150 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
151 struct tevent_timer *te,
152 struct timeval current_time,
155 struct messaging_dgm_out *out = talloc_get_type_abort(
156 private_data, struct messaging_dgm_out);
159 out->idle_timer = NULL;
161 qlen = tevent_queue_length(out->queue);
168 * Setup the idle handler to fire afer 1 second if the
172 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
176 qlen = tevent_queue_length(out->queue);
178 TALLOC_FREE(out->idle_timer);
182 if (out->idle_timer != NULL) {
183 tevent_update_timer(out->idle_timer,
184 tevent_timeval_current_ofs(1, 0));
188 out->idle_timer = tevent_add_timer(
189 out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
190 messaging_dgm_out_idle_handler, out);
192 * No NULL check, we'll come back here. Worst case we're
197 static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
198 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
199 struct tevent_timer *te,
200 struct timeval current_time,
204 * Connect to an existing rendezvous point for another
205 * pid - wrapped inside a struct messaging_dgm_out *.
208 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
209 struct messaging_dgm_context *ctx,
210 pid_t pid, struct messaging_dgm_out **pout)
212 struct messaging_dgm_out *out;
213 struct sockaddr_un addr = { .sun_family = AF_UNIX };
216 char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
218 out = talloc(mem_ctx, struct messaging_dgm_out);
223 *out = (struct messaging_dgm_out) {
229 out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
230 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
231 if (out_pathlen < 0) {
234 if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
239 memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
241 out->queue = tevent_queue_create(out, addr.sun_path);
242 if (out->queue == NULL) {
247 out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
248 if (out->sock == -1) {
252 DLIST_ADD(ctx->outsocks, out);
253 talloc_set_destructor(out, messaging_dgm_out_destructor);
256 ret = connect(out->sock,
257 (const struct sockaddr *)(const void *)&addr,
259 } while ((ret == -1) && (errno == EINTR));
265 ret = set_blocking(out->sock, false);
269 out->is_blocking = false;
280 static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
282 DLIST_REMOVE(out->ctx->outsocks, out);
284 if ((tevent_queue_length(out->queue) != 0) &&
285 (getpid() == out->ctx->pid)) {
287 * We have pending jobs. We can't close the socket,
288 * this has been handed over to messaging_dgm_out_queue_state.
293 if (out->sock != -1) {
301 * Find the struct messaging_dgm_out * to talk to pid.
302 * If we don't have one, create it. Set the timer to
303 * delete after 1 sec.
306 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
307 struct messaging_dgm_out **pout)
309 struct messaging_dgm_out *out;
312 for (out = ctx->outsocks; out != NULL; out = out->next) {
313 if (out->pid == pid) {
319 ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
325 messaging_dgm_out_rearm_idle_timer(out);
332 * This function is called directly to send a message fragment
333 * when the outgoing queue is zero, and from a pthreadpool
334 * job thread when messages are being queued (qlen != 0).
335 * Make sure *ONLY* thread-safe functions are called within.
338 static ssize_t messaging_dgm_sendmsg(int sock,
339 const struct iovec *iov, int iovlen,
340 const int *fds, size_t num_fds,
347 * Do the actual sendmsg syscall. This will be called from a
348 * pthreadpool helper thread, so be careful what you do here.
351 msg = (struct msghdr) {
352 .msg_iov = discard_const_p(struct iovec, iov),
356 fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
365 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
368 ret = sendmsg(sock, &msg, 0);
369 } while ((ret == -1) && (errno == EINTR));
378 struct messaging_dgm_out_queue_state {
379 struct tevent_context *ev;
380 struct pthreadpool_tevent *pool;
382 struct tevent_req *req;
383 struct tevent_req *subreq;
394 static int messaging_dgm_out_queue_state_destructor(
395 struct messaging_dgm_out_queue_state *state);
396 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
398 static void messaging_dgm_out_threaded_job(void *private_data);
399 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
402 * Push a message fragment onto a queue to be sent by a
403 * threadpool job. Makes copies of data/fd's to be sent.
404 * The running tevent_queue internally creates an immediate
405 * event to schedule the write.
408 static struct tevent_req *messaging_dgm_out_queue_send(
409 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
410 struct messaging_dgm_out *out,
411 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
413 struct tevent_req *req;
414 struct messaging_dgm_out_queue_state *state;
415 struct tevent_queue_entry *e;
419 req = tevent_req_create(out, &state,
420 struct messaging_dgm_out_queue_state);
425 state->pool = out->ctx->pool;
426 state->sock = out->sock;
430 * Go blocking in a thread
432 if (!out->is_blocking) {
433 int ret = set_blocking(out->sock, true);
435 tevent_req_error(req, errno);
436 return tevent_req_post(req, ev);
438 out->is_blocking = true;
441 buflen = iov_buflen(iov, iovlen);
443 tevent_req_error(req, EMSGSIZE);
444 return tevent_req_post(req, ev);
447 state->buf = talloc_array(state, uint8_t, buflen);
448 if (tevent_req_nomem(state->buf, req)) {
449 return tevent_req_post(req, ev);
451 iov_buf(iov, iovlen, state->buf, buflen);
453 state->fds = talloc_array(state, int, num_fds);
454 if (tevent_req_nomem(state->fds, req)) {
455 return tevent_req_post(req, ev);
458 for (i=0; i<num_fds; i++) {
462 for (i=0; i<num_fds; i++) {
464 state->fds[i] = dup(fds[i]);
466 if (state->fds[i] == -1) {
469 close_fd_array(state->fds, num_fds);
471 tevent_req_error(req, ret);
472 return tevent_req_post(req, ev);
476 talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
478 e = tevent_queue_add_entry(out->queue, ev, req,
479 messaging_dgm_out_queue_trigger, req);
480 if (tevent_req_nomem(e, req)) {
481 return tevent_req_post(req, ev);
486 static int messaging_dgm_out_queue_state_destructor(
487 struct messaging_dgm_out_queue_state *state)
492 if (state->subreq != NULL) {
494 * We're scheduled, but we're destroyed. This happens
495 * if the messaging_dgm_context is destroyed while
496 * we're stuck in a blocking send. There's nothing we
497 * can do but to leak memory.
499 TALLOC_FREE(state->subreq);
500 (void)talloc_reparent(state->req, NULL, state);
505 num_fds = talloc_array_length(fds);
506 close_fd_array(fds, num_fds);
511 * tevent_queue callback that schedules the pthreadpool to actually
512 * send the queued message fragment.
515 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
518 struct messaging_dgm_out_queue_state *state = tevent_req_data(
519 req, struct messaging_dgm_out_queue_state);
521 tevent_req_reset_endtime(req);
523 state->subreq = pthreadpool_tevent_job_send(
524 state, state->ev, state->pool,
525 messaging_dgm_out_threaded_job, state);
526 if (tevent_req_nomem(state->subreq, req)) {
529 tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
534 * Wrapper function run by the pthread that calls
535 * messaging_dgm_sendmsg() to actually do the sendmsg().
538 static void messaging_dgm_out_threaded_job(void *private_data)
540 struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
541 private_data, struct messaging_dgm_out_queue_state);
543 struct iovec iov = { .iov_base = state->buf,
544 .iov_len = talloc_get_size(state->buf) };
545 size_t num_fds = talloc_array_length(state->fds);
551 state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
552 state->fds, num_fds, &state->err);
554 if (state->sent != -1) {
557 if (state->err != ENOBUFS) {
562 * ENOBUFS is the FreeBSD way of saying "Try
563 * again". We have to do polling.
566 ret = poll(NULL, 0, msec);
567 } while ((ret == -1) && (errno == EINTR));
570 * Exponential backoff up to once a second
573 msec = MIN(msec, 1000);
578 * Pickup the results of the pthread sendmsg().
581 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
583 struct tevent_req *req = tevent_req_callback_data(
584 subreq, struct tevent_req);
585 struct messaging_dgm_out_queue_state *state = tevent_req_data(
586 req, struct messaging_dgm_out_queue_state);
589 if (subreq != state->subreq) {
593 ret = pthreadpool_tevent_job_recv(subreq);
596 state->subreq = NULL;
598 if (tevent_req_error(req, ret)) {
601 if (state->sent == -1) {
602 tevent_req_error(req, state->err);
605 tevent_req_done(req);
608 static int messaging_dgm_out_queue_recv(struct tevent_req *req)
610 return tevent_req_simple_recv_unix(req);
613 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
616 * Core function to send a message fragment given a
617 * connected struct messaging_dgm_out * destination.
618 * If no current queue tries to send nonblocking
619 * directly. If not, queues the fragment (which makes
620 * a copy of it) and adds a 60-second timeout on the send.
623 static int messaging_dgm_out_send_fragment(
624 struct tevent_context *ev, struct messaging_dgm_out *out,
625 const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
627 struct tevent_req *req;
631 qlen = tevent_queue_length(out->queue);
636 if (out->is_blocking) {
637 int ret = set_blocking(out->sock, false);
641 out->is_blocking = false;
644 nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
650 if (err == ENOBUFS) {
652 * FreeBSD's way of telling us the dst socket
653 * is full. EWOULDBLOCK makes us spawn a
654 * polling helper thread.
659 if (err != EWOULDBLOCK) {
664 req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
669 tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
671 ok = tevent_req_set_endtime(req, ev,
672 tevent_timeval_current_ofs(60, 0));
682 * Pickup the result of the fragment send. Reset idle timer
686 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
688 struct messaging_dgm_out *out = tevent_req_callback_data(
689 req, struct messaging_dgm_out);
692 ret = messaging_dgm_out_queue_recv(req);
696 DBG_WARNING("messaging_out_queue_recv returned %s\n",
700 messaging_dgm_out_rearm_idle_timer(out);
704 struct messaging_dgm_fragment_hdr {
711 * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
712 * size chunks and send it.
714 * Message fragments are prefixed by a 64-bit cookie that
715 * stays the same for all fragments. This allows the receiver
716 * to recognise fragments of the same message and re-assemble
717 * them on the other end.
719 * Note that this allows other message fragments from other
720 * senders to be interleaved in the receive read processing,
721 * the combination of the cookie and header info allows unique
722 * identification of the message from a specific sender in
725 * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
726 * then send a single message with cookie set to zero.
728 * Otherwise the message is fragmented into chunks and added
729 * to the sending queue. Any file descriptors are passed only
730 * in the last fragment.
732 * Finally the cookie is incremented (wrap over zero) to
733 * prepare for the next message sent to this channel.
737 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
738 struct messaging_dgm_out *out,
739 const struct iovec *iov,
741 const int *fds, size_t num_fds)
743 ssize_t msglen, sent;
745 struct iovec iov_copy[iovlen+2];
746 struct messaging_dgm_fragment_hdr hdr;
747 struct iovec src_iov;
753 msglen = iov_buflen(iov, iovlen);
757 if (num_fds > INT8_MAX) {
761 if ((size_t) msglen <=
762 (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
765 iov_copy[0].iov_base = &cookie;
766 iov_copy[0].iov_len = sizeof(cookie);
768 memcpy(&iov_copy[1], iov,
769 sizeof(struct iovec) * iovlen);
772 return messaging_dgm_out_send_fragment(
773 ev, out, iov_copy, iovlen+1, fds, num_fds);
777 hdr = (struct messaging_dgm_fragment_hdr) {
783 iov_copy[0].iov_base = &out->cookie;
784 iov_copy[0].iov_len = sizeof(out->cookie);
785 iov_copy[1].iov_base = &hdr;
786 iov_copy[1].iov_len = sizeof(hdr);
792 * The following write loop sends the user message in pieces. We have
793 * filled the first two iovecs above with "cookie" and "hdr". In the
794 * following loops we pull message chunks from the user iov array and
795 * fill iov_copy piece by piece, possibly truncating chunks from the
796 * caller's iov array. Ugly, but hopefully efficient.
799 while (sent < msglen) {
801 size_t iov_index = 2;
803 fragment_len = sizeof(out->cookie) + sizeof(hdr);
805 while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
808 space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
809 chunk = MIN(space, src_iov.iov_len);
811 iov_copy[iov_index].iov_base = src_iov.iov_base;
812 iov_copy[iov_index].iov_len = chunk;
815 src_iov.iov_base = (char *)src_iov.iov_base + chunk;
816 src_iov.iov_len -= chunk;
817 fragment_len += chunk;
819 if (src_iov.iov_len == 0) {
828 sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
831 * only the last fragment should pass the fd array.
832 * That simplifies the receiver a lot.
835 ret = messaging_dgm_out_send_fragment(
836 ev, out, iov_copy, iov_index, NULL, 0);
838 ret = messaging_dgm_out_send_fragment(
839 ev, out, iov_copy, iov_index, fds, num_fds);
847 if (out->cookie == 0) {
854 static struct messaging_dgm_context *global_dgm_context;
856 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
858 static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
859 pid_t pid, int *plockfile_fd,
864 struct sun_path_buf lockfile_name;
870 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
871 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
875 if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
879 /* no O_EXCL, existence check is via the fcntl lock */
881 lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
884 if ((lockfile_fd == -1) &&
885 ((errno == ENXIO) /* Linux */ ||
886 (errno == ENODEV) /* Linux kernel bug */ ||
887 (errno == EOPNOTSUPP) /* FreeBSD */)) {
889 * Huh -- a socket? This might be a stale socket from
890 * an upgrade of Samba. Just unlink and retry, nobody
891 * else is supposed to be here at this time.
893 * Yes, this is racy, but I don't see a way to deal
894 * with this properly.
896 unlink(lockfile_name.buf);
898 lockfile_fd = open(lockfile_name.buf,
899 O_NONBLOCK|O_CREAT|O_WRONLY,
903 if (lockfile_fd == -1) {
905 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
909 lck = (struct flock) {
914 ret = fcntl(lockfile_fd, F_SETLK, &lck);
917 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
922 * Directly using the binary value for
923 * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
924 * violation. But including all of ndr here just for this
925 * seems to be a bit overkill to me. Also, messages_dgm might
926 * be replaced sooner or later by something streams-based,
927 * where unique_id generation will be handled differently.
931 generate_random_buffer((uint8_t *)&unique, sizeof(unique));
932 } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
934 unique_len = snprintf(buf, sizeof(buf), "%ju\n", (uintmax_t)unique);
936 /* shorten a potentially preexisting file */
938 ret = ftruncate(lockfile_fd, unique_len);
941 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
946 written = write(lockfile_fd, buf, unique_len);
947 if (written != unique_len) {
949 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
953 *plockfile_fd = lockfile_fd;
958 unlink(lockfile_name.buf);
964 static void messaging_dgm_read_handler(struct tevent_context *ev,
965 struct tevent_fd *fde,
970 * Create the rendezvous point in the file system
971 * that other processes can use to send messages to
975 int messaging_dgm_init(struct tevent_context *ev,
977 const char *socket_dir,
978 const char *lockfile_dir,
979 void (*recv_cb)(struct tevent_context *ev,
985 void *recv_cb_private_data)
987 struct messaging_dgm_context *ctx;
989 struct sockaddr_un socket_address;
991 static bool have_dgm_context = false;
993 if (have_dgm_context) {
997 ctx = talloc_zero(NULL, struct messaging_dgm_context);
1002 ctx->pid = getpid();
1003 ctx->recv_cb = recv_cb;
1004 ctx->recv_cb_private_data = recv_cb_private_data;
1006 len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1007 sizeof(ctx->lockfile_dir.buf));
1008 if (len >= sizeof(ctx->lockfile_dir.buf)) {
1010 return ENAMETOOLONG;
1013 len = strlcpy(ctx->socket_dir.buf, socket_dir,
1014 sizeof(ctx->socket_dir.buf));
1015 if (len >= sizeof(ctx->socket_dir.buf)) {
1017 return ENAMETOOLONG;
1020 socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1021 len = snprintf(socket_address.sun_path,
1022 sizeof(socket_address.sun_path),
1023 "%s/%u", socket_dir, (unsigned)ctx->pid);
1024 if (len >= sizeof(socket_address.sun_path)) {
1026 return ENAMETOOLONG;
1029 ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1032 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1033 __func__, strerror(ret)));
1038 unlink(socket_address.sun_path);
1040 ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1041 if (ctx->sock == -1) {
1043 DBG_WARNING("socket failed: %s\n", strerror(ret));
1048 ret = prepare_socket_cloexec(ctx->sock);
1051 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1057 ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1058 sizeof(socket_address));
1061 DBG_WARNING("bind failed: %s\n", strerror(ret));
1066 talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1068 ctx->have_dgm_context = &have_dgm_context;
1070 ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1072 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1078 global_dgm_context = ctx;
1087 * Remove the rendezvous point in the filesystem
1088 * if we're the owner.
1091 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1093 while (c->outsocks != NULL) {
1094 TALLOC_FREE(c->outsocks);
1096 while (c->in_msgs != NULL) {
1097 TALLOC_FREE(c->in_msgs);
1099 while (c->fde_evs != NULL) {
1100 tevent_fd_set_flags(c->fde_evs->fde, 0);
1101 c->fde_evs->ctx = NULL;
1102 DLIST_REMOVE(c->fde_evs, c->fde_evs);
1107 if (getpid() == c->pid) {
1108 struct sun_path_buf name;
1111 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1112 c->socket_dir.buf, (unsigned)c->pid);
1113 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1115 * We've checked the length when creating, so this
1116 * should never happen
1122 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1123 c->lockfile_dir.buf, (unsigned)c->pid);
1124 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1126 * We've checked the length when creating, so this
1127 * should never happen
1133 close(c->lockfile_fd);
1135 if (c->have_dgm_context != NULL) {
1136 *c->have_dgm_context = false;
1142 static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1145 pid_t pid = getpid();
1146 struct sockaddr_storage addr;
1147 socklen_t addrlen = sizeof(addr);
1148 struct sockaddr_un *un_addr;
1149 struct sun_path_buf pathbuf;
1150 struct stat st1, st2;
1154 * Protect against using the wrong messaging context after a
1155 * fork without reinit_after_fork.
1158 ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1160 DBG_ERR("getsockname failed: %s\n", strerror(errno));
1163 if (addr.ss_family != AF_UNIX) {
1164 DBG_ERR("getsockname returned family %d\n",
1165 (int)addr.ss_family);
1168 un_addr = (struct sockaddr_un *)&addr;
1170 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1171 "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1173 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1176 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1177 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1181 if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1182 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1183 pathbuf.buf, un_addr->sun_path);
1187 ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1188 "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1190 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1193 if ((size_t)ret >= sizeof(pathbuf.buf)) {
1194 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1198 ret = stat(pathbuf.buf, &st1);
1200 DBG_ERR("stat failed: %s\n", strerror(errno));
1203 ret = fstat(ctx->lockfile_fd, &st2);
1205 DBG_ERR("fstat failed: %s\n", strerror(errno));
1209 if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1210 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1211 (int)st2.st_dev, (int)st2.st_ino,
1212 (int)st1.st_dev, (int)st1.st_ino);
1224 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1225 struct tevent_context *ev,
1226 uint8_t *msg, size_t msg_len,
1227 int *fds, size_t num_fds);
1230 * Raw read callback handler - passes to messaging_dgm_recv()
1231 * for fragment reassembly processing.
1234 static void messaging_dgm_read_handler(struct tevent_context *ev,
1235 struct tevent_fd *fde,
1239 struct messaging_dgm_context *ctx = talloc_get_type_abort(
1240 private_data, struct messaging_dgm_context);
1244 size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1245 uint8_t msgbuf[msgbufsize];
1246 uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1249 messaging_dgm_validate(ctx);
1251 if ((flags & TEVENT_FD_READ) == 0) {
1255 iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1256 msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1258 msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1260 #ifdef MSG_CMSG_CLOEXEC
1261 msg.msg_flags |= MSG_CMSG_CLOEXEC;
1264 received = recvmsg(ctx->sock, &msg, 0);
1265 if (received == -1) {
1266 if ((errno == EAGAIN) ||
1267 (errno == EWOULDBLOCK) ||
1269 (errno == ENOMEM)) {
1270 /* Not really an error - just try again. */
1273 /* Problem with the socket. Set it unreadable. */
1274 tevent_fd_set_flags(fde, 0);
1278 if ((size_t)received > sizeof(buf)) {
1279 /* More than we expected, not for us */
1283 num_fds = msghdr_extract_fds(&msg, NULL, 0);
1287 messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1292 msghdr_extract_fds(&msg, fds, num_fds);
1294 for (i = 0; i < num_fds; i++) {
1297 err = prepare_socket_cloexec(fds[i]);
1299 close_fd_array(fds, num_fds);
1304 messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1308 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1310 DLIST_REMOVE(m->ctx->in_msgs, m);
1315 * Deal with identification of fragmented messages and
1316 * re-assembly into full messages sent, then calls the
1320 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1321 struct tevent_context *ev,
1322 uint8_t *buf, size_t buflen,
1323 int *fds, size_t num_fds)
1325 struct messaging_dgm_fragment_hdr hdr;
1326 struct messaging_dgm_in_msg *msg;
1330 if (buflen < sizeof(cookie)) {
1333 memcpy(&cookie, buf, sizeof(cookie));
1334 buf += sizeof(cookie);
1335 buflen -= sizeof(cookie);
1338 ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1339 ctx->recv_cb_private_data);
1343 if (buflen < sizeof(hdr)) {
1346 memcpy(&hdr, buf, sizeof(hdr));
1348 buflen -= sizeof(hdr);
1350 for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1351 if ((msg->sender_pid == hdr.pid) &&
1352 (msg->sender_sock == hdr.sock)) {
1357 if ((msg != NULL) && (msg->cookie != cookie)) {
1363 msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1366 msg = talloc_size(ctx, msglen);
1370 talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1372 *msg = (struct messaging_dgm_in_msg) {
1373 .ctx = ctx, .msglen = hdr.msglen,
1374 .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1377 DLIST_ADD(ctx->in_msgs, msg);
1378 talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1381 space = msg->msglen - msg->received;
1382 if (buflen > space) {
1386 memcpy(msg->buf + msg->received, buf, buflen);
1387 msg->received += buflen;
1389 if (msg->received < msg->msglen) {
1391 * Any valid sender will send the fds in the last
1392 * block. Invalid senders might have sent fd's that we
1393 * need to close here.
1398 DLIST_REMOVE(ctx->in_msgs, msg);
1399 talloc_set_destructor(msg, NULL);
1401 ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1402 ctx->recv_cb_private_data);
1408 close_fd_array(fds, num_fds);
1411 void messaging_dgm_destroy(void)
1413 TALLOC_FREE(global_dgm_context);
1416 int messaging_dgm_send(pid_t pid,
1417 const struct iovec *iov, int iovlen,
1418 const int *fds, size_t num_fds)
1420 struct messaging_dgm_context *ctx = global_dgm_context;
1421 struct messaging_dgm_out *out;
1423 unsigned retries = 0;
1429 messaging_dgm_validate(ctx);
1432 ret = messaging_dgm_out_get(ctx, pid, &out);
1437 DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1439 ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1441 if (ret == ECONNREFUSED) {
1443 * We cache outgoing sockets. If the receiver has
1444 * closed and re-opened the socket since our last
1445 * message, we get connection refused. Retry.
1458 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1463 unsigned long long unique;
1466 rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1472 unique = strtoull_err(buf, &endptr, 10, &error);
1477 if (endptr[0] != '\n') {
1484 int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1486 struct messaging_dgm_context *ctx = global_dgm_context;
1487 struct sun_path_buf lockfile_name;
1494 messaging_dgm_validate(ctx);
1496 if (pid == getpid()) {
1498 * Protect against losing our own lock
1500 return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1503 ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1504 "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1508 if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1509 return ENAMETOOLONG;
1512 fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1517 ret = messaging_dgm_read_unique(fd, unique);
1522 int messaging_dgm_cleanup(pid_t pid)
1524 struct messaging_dgm_context *ctx = global_dgm_context;
1525 struct sun_path_buf lockfile_name, socket_name;
1527 struct flock lck = {};
1533 len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1534 ctx->socket_dir.buf, (unsigned)pid);
1538 if ((size_t)len >= sizeof(socket_name.buf)) {
1539 return ENAMETOOLONG;
1542 len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1543 ctx->lockfile_dir.buf, (unsigned)pid);
1547 if ((size_t)len >= sizeof(lockfile_name.buf)) {
1548 return ENAMETOOLONG;
1551 fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1554 if (ret != ENOENT) {
1555 DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1556 lockfile_name.buf, strerror(ret)));
1561 lck.l_type = F_WRLCK;
1562 lck.l_whence = SEEK_SET;
1566 ret = fcntl(fd, F_SETLK, &lck);
1569 if ((ret != EACCES) && (ret != EAGAIN)) {
1570 DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1577 DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1579 (void)unlink(socket_name.buf);
1580 (void)unlink(lockfile_name.buf);
1585 static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1587 pid_t *our_pid = (pid_t *)private_data;
1590 if (pid == *our_pid) {
1592 * fcntl(F_GETLK) will succeed for ourselves, we hold
1593 * that lock ourselves.
1598 ret = messaging_dgm_cleanup(pid);
1599 DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1600 (unsigned long)pid, ret ? strerror(ret) : "ok"));
1605 int messaging_dgm_wipe(void)
1607 pid_t pid = getpid();
1608 messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1612 int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1615 struct messaging_dgm_context *ctx = global_dgm_context;
1624 messaging_dgm_validate(ctx);
1627 * We scan the socket directory and not the lock directory. Otherwise
1628 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1632 msgdir = opendir(ctx->socket_dir.buf);
1633 if (msgdir == NULL) {
1637 while ((dp = readdir(msgdir)) != NULL) {
1641 pid = strtoul_err(dp->d_name, NULL, 10, &error);
1642 if ((pid == 0) || (error != 0)) {
1644 * . and .. and other malformed entries
1649 ret = fn(pid, private_data);
1659 struct messaging_dgm_fde {
1660 struct tevent_fd *fde;
1663 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1665 if (fde_ev->ctx != NULL) {
1666 DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1673 * Reference counter for a struct tevent_fd messaging read event
1674 * (with callback function) on a struct tevent_context registered
1675 * on a messaging context.
1677 * If we've already registered this struct tevent_context before
1678 * (so already have a read event), just increase the reference count.
1680 * Otherwise create a new struct tevent_fd messaging read event on the
1681 * previously unseen struct tevent_context - this is what drives
1682 * the message receive processing.
1686 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1687 TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1689 struct messaging_dgm_context *ctx = global_dgm_context;
1690 struct messaging_dgm_fde_ev *fde_ev;
1691 struct messaging_dgm_fde *fde;
1697 fde = talloc(mem_ctx, struct messaging_dgm_fde);
1702 for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1703 if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1705 * If the event context got deleted,
1706 * tevent_fd_get_flags() will return 0
1707 * for the stale fde.
1709 * In that case we should not
1710 * use fde_ev->ev anymore.
1714 if (fde_ev->ev == ev) {
1719 if (fde_ev == NULL) {
1720 fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1721 if (fde_ev == NULL) {
1724 fde_ev->fde = tevent_add_fd(
1725 ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1726 messaging_dgm_read_handler, ctx);
1727 if (fde_ev->fde == NULL) {
1733 DLIST_ADD(ctx->fde_evs, fde_ev);
1734 talloc_set_destructor(
1735 fde_ev, messaging_dgm_fde_ev_destructor);
1738 * Same trick as with tdb_wrap: The caller will never
1739 * see the talloc_referenced object, the
1740 * messaging_dgm_fde_ev, so problems with
1741 * talloc_unlink will not happen.
1743 if (talloc_reference(fde, fde_ev) == NULL) {
1749 fde->fde = fde_ev->fde;
1753 bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1760 flags = tevent_fd_get_flags(fde->fde);
1761 return (flags != 0);