#include "system/network.h"
#include "system/filesys.h"
#include "system/dir.h"
+#include "system/select.h"
#include "lib/util/debug.h"
-#include "lib/unix_msg/unix_msg.h"
#include "lib/messages_dgm.h"
-#include "poll_funcs/poll_funcs_tevent.h"
#include "lib/util/genrand.h"
+#include "lib/util/dlinklist.h"
+#include "lib/pthreadpool/pthreadpool_tevent.h"
+#include "lib/util/msghdr.h"
+#include "lib/util/iov_buf.h"
+#include "lib/util/blocking.h"
+#include "lib/util/tevent_unix.h"
+
+#define MESSAGING_DGM_FRAGMENT_LENGTH 1024
struct sun_path_buf {
/*
char buf[sizeof(struct sockaddr_un)];
};
+struct messaging_dgm_context;
+
+struct messaging_dgm_out {
+ struct messaging_dgm_out *prev, *next;
+ struct messaging_dgm_context *ctx;
+
+ pid_t pid;
+ int sock;
+ bool is_blocking;
+ uint64_t cookie;
+
+ struct tevent_queue *queue;
+ struct tevent_timer *idle_timer;
+};
+
+struct messaging_dgm_in_msg {
+ struct messaging_dgm_in_msg *prev, *next;
+ struct messaging_dgm_context *ctx;
+ size_t msglen;
+ size_t received;
+ pid_t sender_pid;
+ int sender_sock;
+ uint64_t cookie;
+ uint8_t buf[];
+};
+
struct messaging_dgm_context {
+ struct tevent_context *ev;
pid_t pid;
- struct poll_funcs *msg_callbacks;
- void *tevent_handle;
- struct unix_msg_ctx *dgm_ctx;
struct sun_path_buf socket_dir;
struct sun_path_buf lockfile_dir;
int lockfile_fd;
+ int sock;
+ struct tevent_fd *read_fde;
+ struct messaging_dgm_in_msg *in_msgs;
+
void (*recv_cb)(const uint8_t *msg,
size_t msg_len,
int *fds,
void *recv_cb_private_data;
bool *have_dgm_context;
+
+ struct pthreadpool_tevent *pool;
+ struct messaging_dgm_out *outsocks;
};
-static struct messaging_dgm_context *global_dgm_context;
+/* Set socket close on exec. */
+static int prepare_socket_cloexec(int sock)
+{
+#ifdef FD_CLOEXEC
+ int flags;
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data);
+ flags = fcntl(sock, F_GETFD, 0);
+ if (flags == -1) {
+ return errno;
+ }
+ flags |= FD_CLOEXEC;
+ if (fcntl(sock, F_SETFD, flags) == -1) {
+ return errno;
+ }
+#endif
+ return 0;
+}
+
+static void close_fd_array(int *fds, size_t num_fds)
+{
+ size_t i;
+
+ for (i = 0; i < num_fds; i++) {
+ if (fds[i] == -1) {
+ continue;
+ }
+
+ close(fds[i]);
+ fds[i] = -1;
+ }
+}
+
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval current_time,
+ void *private_data)
+{
+ struct messaging_dgm_out *out = talloc_get_type_abort(
+ private_data, struct messaging_dgm_out);
+ size_t qlen;
+
+ out->idle_timer = NULL;
+
+ qlen = tevent_queue_length(out->queue);
+ if (qlen == 0) {
+ TALLOC_FREE(out);
+ }
+}
+
+static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
+{
+ size_t qlen;
+
+ qlen = tevent_queue_length(out->queue);
+ if (qlen != 0) {
+ TALLOC_FREE(out->idle_timer);
+ return;
+ }
+
+ if (out->idle_timer != NULL) {
+ tevent_update_timer(out->idle_timer,
+ tevent_timeval_current_ofs(1, 0));
+ return;
+ }
+
+ out->idle_timer = tevent_add_timer(
+ out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
+ messaging_dgm_out_idle_handler, out);
+ /*
+ * No NULL check, we'll come back here. Worst case we're
+ * leaking a bit.
+ */
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval current_time,
+ void *private_data);
+
+static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
+ struct messaging_dgm_context *ctx,
+ pid_t pid, struct messaging_dgm_out **pout)
+{
+ struct messaging_dgm_out *out;
+ struct sockaddr_un addr = { .sun_family = AF_UNIX };
+ int ret = ENOMEM;
+ int out_pathlen;
+
+ out = talloc(mem_ctx, struct messaging_dgm_out);
+ if (out == NULL) {
+ goto fail;
+ }
+
+ *out = (struct messaging_dgm_out) {
+ .pid = pid,
+ .ctx = ctx,
+ .cookie = 1
+ };
+
+ out_pathlen = snprintf(addr.sun_path, sizeof(addr.sun_path),
+ "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
+ if (out_pathlen < 0) {
+ goto errno_fail;
+ }
+ if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
+ ret = ENAMETOOLONG;
+ goto fail;
+ }
+
+ out->queue = tevent_queue_create(out, addr.sun_path);
+ if (out->queue == NULL) {
+ ret = ENOMEM;
+ goto fail;
+ }
+
+ out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (out->sock == -1) {
+ goto errno_fail;
+ }
+
+ DLIST_ADD(ctx->outsocks, out);
+ talloc_set_destructor(out, messaging_dgm_out_destructor);
+
+ do {
+ ret = connect(out->sock,
+ (const struct sockaddr *)(const void *)&addr,
+ sizeof(addr));
+ } while ((ret == -1) && (errno == EINTR));
+
+ if (ret == -1) {
+ goto errno_fail;
+ }
+
+ ret = set_blocking(out->sock, false);
+ if (ret == -1) {
+ goto errno_fail;
+ }
+ out->is_blocking = false;
+
+ *pout = out;
+ return 0;
+errno_fail:
+ ret = errno;
+fail:
+ TALLOC_FREE(out);
+ return ret;
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
+{
+ DLIST_REMOVE(out->ctx->outsocks, out);
+
+ if (tevent_queue_length(out->queue) != 0) {
+ /*
+ * We have pending jobs. We can't close the socket,
+ * this has been handed over to messaging_dgm_out_queue_state.
+ */
+ return 0;
+ }
+
+ if (out->sock != -1) {
+ close(out->sock);
+ out->sock = -1;
+ }
+ return 0;
+}
+
+static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
+ struct messaging_dgm_out **pout)
+{
+ struct messaging_dgm_out *out;
+ int ret;
+
+ for (out = ctx->outsocks; out != NULL; out = out->next) {
+ if (out->pid == pid) {
+ break;
+ }
+ }
+
+ if (out == NULL) {
+ ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+
+ messaging_dgm_out_rearm_idle_timer(out);
+
+ *pout = out;
+ return 0;
+}
+
+static ssize_t messaging_dgm_sendmsg(int sock,
+ const struct iovec *iov, int iovlen,
+ const int *fds, size_t num_fds,
+ int *perrno)
+{
+ struct msghdr msg;
+ ssize_t fdlen, ret;
+
+ /*
+ * Do the actual sendmsg syscall. This will be called from a
+ * pthreadpool helper thread, so be careful what you do here.
+ */
+
+ msg = (struct msghdr) {
+ .msg_iov = discard_const_p(struct iovec, iov),
+ .msg_iovlen = iovlen
+ };
+
+ fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
+ if (fdlen == -1) {
+ *perrno = EINVAL;
+ return -1;
+ }
+
+ {
+ uint8_t buf[fdlen];
+
+ msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
+
+ do {
+ ret = sendmsg(sock, &msg, MSG_NOSIGNAL);
+ } while ((ret == -1) && (errno == EINTR));
+ }
+
+ if (ret == -1) {
+ *perrno = errno;
+ }
+ return ret;
+}
+
+struct messaging_dgm_out_queue_state {
+ struct tevent_context *ev;
+ struct pthreadpool_tevent *pool;
+
+ struct tevent_req *req;
+ struct tevent_req *subreq;
+
+ int sock;
+
+ int *fds;
+ uint8_t *buf;
+
+ ssize_t sent;
+ int err;
+};
+
+static int messaging_dgm_out_queue_state_destructor(
+ struct messaging_dgm_out_queue_state *state);
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+ void *private_data);
+static void messaging_dgm_out_threaded_job(void *private_data);
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
+
+static struct tevent_req *messaging_dgm_out_queue_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct messaging_dgm_out *out,
+ const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+ struct tevent_req *req;
+ struct messaging_dgm_out_queue_state *state;
+ struct tevent_queue_entry *e;
+ size_t i;
+ ssize_t buflen;
+
+ req = tevent_req_create(out, &state,
+ struct messaging_dgm_out_queue_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->pool = out->ctx->pool;
+ state->sock = out->sock;
+ state->req = req;
+
+ /*
+ * Go blocking in a thread
+ */
+ if (!out->is_blocking) {
+ int ret = set_blocking(out->sock, true);
+ if (ret == -1) {
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, ev);
+ }
+ out->is_blocking = true;
+ }
+
+ buflen = iov_buflen(iov, iovlen);
+ if (buflen == -1) {
+ tevent_req_error(req, EMSGSIZE);
+ return tevent_req_post(req, ev);
+ }
+
+ state->buf = talloc_array(state, uint8_t, buflen);
+ if (tevent_req_nomem(state->buf, req)) {
+ return tevent_req_post(req, ev);
+ }
+ iov_buf(iov, iovlen, state->buf, buflen);
+
+ state->fds = talloc_array(state, int, num_fds);
+ if (tevent_req_nomem(state->fds, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ for (i=0; i<num_fds; i++) {
+ state->fds[i] = -1;
+ }
+
+ for (i=0; i<num_fds; i++) {
+
+ state->fds[i] = dup(fds[i]);
+
+ if (state->fds[i] == -1) {
+ int ret = errno;
+
+ close_fd_array(state->fds, num_fds);
+
+ tevent_req_error(req, ret);
+ return tevent_req_post(req, ev);
+ }
+ }
+
+ talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
+
+ e = tevent_queue_add_entry(out->queue, ev, req,
+ messaging_dgm_out_queue_trigger, req);
+ if (tevent_req_nomem(e, req)) {
+ return tevent_req_post(req, ev);
+ }
+ return req;
+}
+
+static int messaging_dgm_out_queue_state_destructor(
+ struct messaging_dgm_out_queue_state *state)
+{
+ int *fds;
+ size_t num_fds;
+
+ if (state->subreq != NULL) {
+ /*
+ * We're scheduled, but we're destroyed. This happens
+ * if the messaging_dgm_context is destroyed while
+ * we're stuck in a blocking send. There's nothing we
+ * can do but to leak memory.
+ */
+ TALLOC_FREE(state->subreq);
+ (void)talloc_reparent(state->req, NULL, state);
+ return -1;
+ }
+
+ fds = state->fds;
+ num_fds = talloc_array_length(fds);
+ close_fd_array(fds, num_fds);
+ return 0;
+}
+
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+ void *private_data)
+{
+ struct messaging_dgm_out_queue_state *state = tevent_req_data(
+ req, struct messaging_dgm_out_queue_state);
+
+ state->subreq = pthreadpool_tevent_job_send(
+ state, state->ev, state->pool,
+ messaging_dgm_out_threaded_job, state);
+ if (tevent_req_nomem(state->subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
+ req);
+}
+
+static void messaging_dgm_out_threaded_job(void *private_data)
+{
+ struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
+ private_data, struct messaging_dgm_out_queue_state);
+
+ struct iovec iov = { .iov_base = state->buf,
+ .iov_len = talloc_get_size(state->buf) };
+ size_t num_fds = talloc_array_length(state->fds);
+
+ state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
+ state->fds, num_fds, &state->err);
+}
+
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct messaging_dgm_out_queue_state *state = tevent_req_data(
+ req, struct messaging_dgm_out_queue_state);
+ int ret;
+
+ if (subreq != state->subreq) {
+ abort();
+ }
+
+ ret = pthreadpool_tevent_job_recv(subreq);
+
+ TALLOC_FREE(subreq);
+ state->subreq = NULL;
+
+ if (tevent_req_error(req, ret)) {
+ return;
+ }
+ if (state->sent == -1) {
+ tevent_req_error(req, state->err);
+ return;
+ }
+ tevent_req_done(req);
+}
+
+static int messaging_dgm_out_queue_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
+
+static int messaging_dgm_out_send_fragment(
+ struct tevent_context *ev, struct messaging_dgm_out *out,
+ const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+ struct tevent_req *req;
+ size_t qlen;
+
+ qlen = tevent_queue_length(out->queue);
+ if (qlen == 0) {
+ ssize_t nsent;
+ int err = 0;
+
+ if (out->is_blocking) {
+ int ret = set_blocking(out->sock, false);
+ if (ret == -1) {
+ return errno;
+ }
+ out->is_blocking = false;
+ }
+
+ nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
+ num_fds, &err);
+ if (nsent >= 0) {
+ return 0;
+ }
+
+ if (err != EWOULDBLOCK) {
+ return err;
+ }
+ }
+
+ req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
+ fds, num_fds);
+ if (req == NULL) {
+ return ENOMEM;
+ }
+ tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
+
+ return 0;
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
+{
+ struct messaging_dgm_out *out = tevent_req_callback_data(
+ req, struct messaging_dgm_out);
+ int ret;
+
+ ret = messaging_dgm_out_queue_recv(req);
+ TALLOC_FREE(req);
+
+ if (ret != 0) {
+ DBG_WARNING("messaging_out_queue_recv returned %s\n",
+ strerror(ret));
+ }
+
+ messaging_dgm_out_rearm_idle_timer(out);
+}
+
+
+struct messaging_dgm_fragment_hdr {
+ size_t msglen;
+ pid_t pid;
+ int sock;
+};
+
+static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
+ struct messaging_dgm_out *out,
+ const struct iovec *iov,
+ int iovlen,
+ const int *fds, size_t num_fds)
+{
+ ssize_t msglen, sent;
+ int ret = 0;
+ struct iovec iov_copy[iovlen+2];
+ struct messaging_dgm_fragment_hdr hdr;
+ struct iovec src_iov;
+
+ if (iovlen < 0) {
+ return EINVAL;
+ }
+
+ msglen = iov_buflen(iov, iovlen);
+ if (msglen == -1) {
+ return EMSGSIZE;
+ }
+ if (num_fds > INT8_MAX) {
+ return EINVAL;
+ }
+
+ if ((size_t) msglen <=
+ (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
+ uint64_t cookie = 0;
+
+ iov_copy[0].iov_base = &cookie;
+ iov_copy[0].iov_len = sizeof(cookie);
+ if (iovlen > 0) {
+ memcpy(&iov_copy[1], iov,
+ sizeof(struct iovec) * iovlen);
+ }
+
+ return messaging_dgm_out_send_fragment(
+ ev, out, iov_copy, iovlen+1, fds, num_fds);
+
+ }
+
+ hdr = (struct messaging_dgm_fragment_hdr) {
+ .msglen = msglen,
+ .pid = getpid(),
+ .sock = out->sock
+ };
+
+ iov_copy[0].iov_base = &out->cookie;
+ iov_copy[0].iov_len = sizeof(out->cookie);
+ iov_copy[1].iov_base = &hdr;
+ iov_copy[1].iov_len = sizeof(hdr);
+
+ sent = 0;
+ src_iov = iov[0];
+
+ /*
+ * The following write loop sends the user message in pieces. We have
+ * filled the first two iovecs above with "cookie" and "hdr". In the
+ * following loops we pull message chunks from the user iov array and
+ * fill iov_copy piece by piece, possibly truncating chunks from the
+ * caller's iov array. Ugly, but hopefully efficient.
+ */
+
+ while (sent < msglen) {
+ size_t fragment_len;
+ size_t iov_index = 2;
+
+ fragment_len = sizeof(out->cookie) + sizeof(hdr);
+
+ while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
+ size_t space, chunk;
+
+ space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
+ chunk = MIN(space, src_iov.iov_len);
+
+ iov_copy[iov_index].iov_base = src_iov.iov_base;
+ iov_copy[iov_index].iov_len = chunk;
+ iov_index += 1;
+
+ src_iov.iov_base = (char *)src_iov.iov_base + chunk;
+ src_iov.iov_len -= chunk;
+ fragment_len += chunk;
+
+ if (src_iov.iov_len == 0) {
+ iov += 1;
+ iovlen -= 1;
+ if (iovlen == 0) {
+ break;
+ }
+ src_iov = iov[0];
+ }
+ }
+ sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
+
+ /*
+ * only the last fragment should pass the fd array.
+ * That simplifies the receiver a lot.
+ */
+ if (sent < msglen) {
+ ret = messaging_dgm_out_send_fragment(
+ ev, out, iov_copy, iov_index, NULL, 0);
+ } else {
+ ret = messaging_dgm_out_send_fragment(
+ ev, out, iov_copy, iov_index, fds, num_fds);
+ }
+ if (ret != 0) {
+ break;
+ }
+ }
+
+ out->cookie += 1;
+ if (out->cookie == 0) {
+ out->cookie += 1;
+ }
+
+ return ret;
+}
+
+static struct messaging_dgm_context *global_dgm_context;
static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
return ret;
}
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data);
+
int messaging_dgm_init(struct tevent_context *ev,
uint64_t *punique,
const char *socket_dir,
if (ctx == NULL) {
goto fail_nomem;
}
+ ctx->ev = ev;
ctx->pid = getpid();
ctx->recv_cb = recv_cb;
ctx->recv_cb_private_data = recv_cb_private_data;
return ret;
}
- ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
- if (ctx->msg_callbacks == NULL) {
- goto fail_nomem;
- }
+ unlink(socket_address.sun_path);
- ctx->tevent_handle = poll_funcs_tevent_register(
- ctx, ctx->msg_callbacks, ev);
- if (ctx->tevent_handle == NULL) {
- goto fail_nomem;
+ ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (ctx->sock == -1) {
+ ret = errno;
+ DBG_WARNING("socket failed: %s\n", strerror(ret));
+ TALLOC_FREE(ctx);
+ return ret;
}
- unlink(socket_address.sun_path);
+ ret = prepare_socket_cloexec(ctx->sock);
+ if (ret == -1) {
+ ret = errno;
+ DBG_WARNING("prepare_socket_cloexec failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ctx);
+ return ret;
+ }
- ret = unix_msg_init(&socket_address, ctx->msg_callbacks, 1024,
- messaging_dgm_recv, ctx, &ctx->dgm_ctx);
- if (ret != 0) {
- DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+ ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
+ sizeof(socket_address));
+ if (ret == -1) {
+ ret = errno;
+ DBG_WARNING("bind failed: %s\n", strerror(ret));
TALLOC_FREE(ctx);
return ret;
}
+
+ ctx->read_fde = tevent_add_fd(ctx->ev, ctx, ctx->sock, TEVENT_FD_READ,
+ messaging_dgm_read_handler, ctx);
+ if (ctx->read_fde == NULL) {
+ goto fail_nomem;
+ }
+
talloc_set_destructor(ctx, messaging_dgm_context_destructor);
ctx->have_dgm_context = &have_dgm_context;
+ ret = pthreadpool_tevent_init(ctx, 0, &ctx->pool);
+ if (ret != 0) {
+ DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ctx);
+ return ret;
+ }
+
global_dgm_context = ctx;
return 0;
static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
{
- /*
- * First delete the socket to avoid races. The lockfile is the
- * indicator that we're still around.
- */
- unix_msg_free(c->dgm_ctx);
+ while (c->outsocks != NULL) {
+ TALLOC_FREE(c->outsocks);
+ }
+ while (c->in_msgs != NULL) {
+ TALLOC_FREE(c->in_msgs);
+ }
+
+ TALLOC_FREE(c->read_fde);
+ close(c->sock);
if (getpid() == c->pid) {
struct sun_path_buf name;
int ret;
+ ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
+ c->socket_dir.buf, (unsigned)c->pid);
+ if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
+ /*
+ * We've checked the length when creating, so this
+ * should never happen
+ */
+ abort();
+ }
+ unlink(name.buf);
+
ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
c->lockfile_dir.buf, (unsigned)c->pid);
if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
return 0;
}
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+ uint8_t *msg, size_t msg_len,
+ int *fds, size_t num_fds);
+
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data)
+{
+ struct messaging_dgm_context *ctx = talloc_get_type_abort(
+ private_data, struct messaging_dgm_context);
+ ssize_t received;
+ struct msghdr msg;
+ struct iovec iov;
+ size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
+ uint8_t msgbuf[msgbufsize];
+ uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
+
+ if ((flags & TEVENT_FD_READ) == 0) {
+ return;
+ }
+
+ iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
+ msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
+
+ msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
+
+#ifdef MSG_CMSG_CLOEXEC
+ flags |= MSG_CMSG_CLOEXEC;
+#endif
+
+ received = recvmsg(ctx->sock, &msg, 0);
+ if (received == -1) {
+ if ((errno == EAGAIN) ||
+ (errno == EWOULDBLOCK) ||
+ (errno == EINTR) ||
+ (errno == ENOMEM)) {
+ /* Not really an error - just try again. */
+ return;
+ }
+ /* Problem with the socket. Set it unreadable. */
+ tevent_fd_set_flags(ctx->read_fde, 0);
+ return;
+ }
+
+ if ((size_t)received > sizeof(buf)) {
+ /* More than we expected, not for us */
+ return;
+ }
+
+ {
+ size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
+ size_t i;
+ int fds[num_fds];
+
+ msghdr_extract_fds(&msg, fds, num_fds);
+
+ for (i = 0; i < num_fds; i++) {
+ int err;
+
+ err = prepare_socket_cloexec(fds[i]);
+ if (err != 0) {
+ close_fd_array(fds, num_fds);
+ num_fds = 0;
+ }
+ }
+
+ messaging_dgm_recv(ctx, buf, received, fds, num_fds);
+ }
+
+}
+
+static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
+{
+ DLIST_REMOVE(m->ctx->in_msgs, m);
+ return 0;
+}
+
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+ uint8_t *buf, size_t buflen,
+ int *fds, size_t num_fds)
+{
+ struct messaging_dgm_fragment_hdr hdr;
+ struct messaging_dgm_in_msg *msg;
+ size_t space;
+ uint64_t cookie;
+
+ if (buflen < sizeof(cookie)) {
+ goto close_fds;
+ }
+ memcpy(&cookie, buf, sizeof(cookie));
+ buf += sizeof(cookie);
+ buflen -= sizeof(cookie);
+
+ if (cookie == 0) {
+ ctx->recv_cb(buf, buflen, fds, num_fds,
+ ctx->recv_cb_private_data);
+ return;
+ }
+
+ if (buflen < sizeof(hdr)) {
+ goto close_fds;
+ }
+ memcpy(&hdr, buf, sizeof(hdr));
+ buf += sizeof(hdr);
+ buflen -= sizeof(hdr);
+
+ for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
+ if ((msg->sender_pid == hdr.pid) &&
+ (msg->sender_sock == hdr.sock)) {
+ break;
+ }
+ }
+
+ if ((msg != NULL) && (msg->cookie != cookie)) {
+ TALLOC_FREE(msg);
+ }
+
+ if (msg == NULL) {
+ size_t msglen;
+ msglen = offsetof(struct messaging_dgm_in_msg, buf) +
+ hdr.msglen;
+
+ msg = talloc_size(ctx, msglen);
+ if (msg == NULL) {
+ goto close_fds;
+ }
+ talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
+
+ *msg = (struct messaging_dgm_in_msg) {
+ .ctx = ctx, .msglen = hdr.msglen,
+ .sender_pid = hdr.pid, .sender_sock = hdr.sock,
+ .cookie = cookie
+ };
+ DLIST_ADD(ctx->in_msgs, msg);
+ talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
+ }
+
+ space = msg->msglen - msg->received;
+ if (buflen > space) {
+ goto close_fds;
+ }
+
+ memcpy(msg->buf + msg->received, buf, buflen);
+ msg->received += buflen;
+
+ if (msg->received < msg->msglen) {
+ /*
+ * Any valid sender will send the fds in the last
+ * block. Invalid senders might have sent fd's that we
+ * need to close here.
+ */
+ goto close_fds;
+ }
+
+ DLIST_REMOVE(ctx->in_msgs, msg);
+ talloc_set_destructor(msg, NULL);
+
+ ctx->recv_cb(msg->buf, msg->msglen, fds, num_fds,
+ ctx->recv_cb_private_data);
+
+ TALLOC_FREE(msg);
+ return;
+
+close_fds:
+ close_fd_array(fds, num_fds);
+}
+
void messaging_dgm_destroy(void)
{
TALLOC_FREE(global_dgm_context);
const int *fds, size_t num_fds)
{
struct messaging_dgm_context *ctx = global_dgm_context;
- struct sockaddr_un dst;
- ssize_t dst_pathlen;
+ struct messaging_dgm_out *out;
int ret;
if (ctx == NULL) {
return ENOTCONN;
}
- dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
-
- dst_pathlen = snprintf(dst.sun_path, sizeof(dst.sun_path),
- "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
- if (dst_pathlen < 0) {
- return errno;
- }
- if ((size_t)dst_pathlen >= sizeof(dst.sun_path)) {
- return ENAMETOOLONG;
+ ret = messaging_dgm_out_get(ctx, pid, &out);
+ if (ret != 0) {
+ return ret;
}
DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
- ret = unix_msg_send(ctx->dgm_ctx, &dst, iov, iovlen, fds, num_fds);
-
+ ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
+ fds, num_fds);
return ret;
}
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data)
-{
- struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
- private_data, struct messaging_dgm_context);
-
- dgm_ctx->recv_cb(msg, msg_len, fds, num_fds,
- dgm_ctx->recv_cb_private_data);
-}
-
static int messaging_dgm_read_unique(int fd, uint64_t *punique)
{
char buf[25];
if (ctx == NULL) {
return NULL;
}
- return poll_funcs_tevent_register(mem_ctx, ctx->msg_callbacks, ev);
+ return tevent_add_fd(ev, mem_ctx, ctx->sock, TEVENT_FD_READ,
+ messaging_dgm_read_handler, ctx);
}