messages_dgm: Convert to pthreadpool_tevent
authorVolker Lendecke <vl@samba.org>
Fri, 9 Sep 2016 14:51:00 +0000 (16:51 +0200)
committerJeremy Allison <jra@samba.org>
Tue, 4 Oct 2016 22:06:22 +0000 (00:06 +0200)
This itself adds a lot of code, however it removes the unix_msg library.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/messages_dgm.c
source3/wscript_build

index 3aa110c77f34bee3c8364ecf1bfdf760807b16b3..5f821685a4a032ce4ad23b06fc22eff19498e93c 100644 (file)
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/dir.h"
+#include "system/select.h"
 #include "lib/util/debug.h"
-#include "lib/unix_msg/unix_msg.h"
 #include "lib/messages_dgm.h"
-#include "poll_funcs/poll_funcs_tevent.h"
 #include "lib/util/genrand.h"
+#include "lib/util/dlinklist.h"
+#include "lib/pthreadpool/pthreadpool_tevent.h"
+#include "lib/util/msghdr.h"
+#include "lib/util/iov_buf.h"
+#include "lib/util/blocking.h"
+#include "lib/util/tevent_unix.h"
+
+#define MESSAGING_DGM_FRAGMENT_LENGTH 1024
 
 struct sun_path_buf {
        /*
@@ -34,15 +41,43 @@ struct sun_path_buf {
        char buf[sizeof(struct sockaddr_un)];
 };
 
+struct messaging_dgm_context;
+
+struct messaging_dgm_out {
+       struct messaging_dgm_out *prev, *next;
+       struct messaging_dgm_context *ctx;
+
+       pid_t pid;
+       int sock;
+       bool is_blocking;
+       uint64_t cookie;
+
+       struct tevent_queue *queue;
+       struct tevent_timer *idle_timer;
+};
+
+struct messaging_dgm_in_msg {
+       struct messaging_dgm_in_msg *prev, *next;
+       struct messaging_dgm_context *ctx;
+       size_t msglen;
+       size_t received;
+       pid_t sender_pid;
+       int sender_sock;
+       uint64_t cookie;
+       uint8_t buf[];
+};
+
 struct messaging_dgm_context {
+       struct tevent_context *ev;
        pid_t pid;
-       struct poll_funcs *msg_callbacks;
-       void *tevent_handle;
-       struct unix_msg_ctx *dgm_ctx;
        struct sun_path_buf socket_dir;
        struct sun_path_buf lockfile_dir;
        int lockfile_fd;
 
+       int sock;
+       struct tevent_fd *read_fde;
+       struct messaging_dgm_in_msg *in_msgs;
+
        void (*recv_cb)(const uint8_t *msg,
                        size_t msg_len,
                        int *fds,
@@ -51,14 +86,615 @@ struct messaging_dgm_context {
        void *recv_cb_private_data;
 
        bool *have_dgm_context;
+
+       struct pthreadpool_tevent *pool;
+       struct messaging_dgm_out *outsocks;
 };
 
-static struct messaging_dgm_context *global_dgm_context;
+/* Set socket close on exec. */
+static int prepare_socket_cloexec(int sock)
+{
+#ifdef FD_CLOEXEC
+       int flags;
 
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
-                              uint8_t *msg, size_t msg_len,
-                              int *fds, size_t num_fds,
-                              void *private_data);
+       flags = fcntl(sock, F_GETFD, 0);
+       if (flags == -1) {
+               return errno;
+       }
+       flags |= FD_CLOEXEC;
+       if (fcntl(sock, F_SETFD, flags) == -1) {
+               return errno;
+       }
+#endif
+       return 0;
+}
+
+static void close_fd_array(int *fds, size_t num_fds)
+{
+       size_t i;
+
+       for (i = 0; i < num_fds; i++) {
+               if (fds[i] == -1) {
+                       continue;
+               }
+
+               close(fds[i]);
+               fds[i] = -1;
+       }
+}
+
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+                                          struct tevent_timer *te,
+                                          struct timeval current_time,
+                                          void *private_data)
+{
+       struct messaging_dgm_out *out = talloc_get_type_abort(
+               private_data, struct messaging_dgm_out);
+       size_t qlen;
+
+       out->idle_timer = NULL;
+
+       qlen = tevent_queue_length(out->queue);
+       if (qlen == 0) {
+               TALLOC_FREE(out);
+       }
+}
+
+static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
+{
+       size_t qlen;
+
+       qlen = tevent_queue_length(out->queue);
+       if (qlen != 0) {
+               TALLOC_FREE(out->idle_timer);
+               return;
+       }
+
+       if (out->idle_timer != NULL) {
+               tevent_update_timer(out->idle_timer,
+                                   tevent_timeval_current_ofs(1, 0));
+               return;
+       }
+
+       out->idle_timer = tevent_add_timer(
+               out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
+               messaging_dgm_out_idle_handler, out);
+       /*
+        * No NULL check, we'll come back here. Worst case we're
+        * leaking a bit.
+        */
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+                                          struct tevent_timer *te,
+                                          struct timeval current_time,
+                                          void *private_data);
+
+static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
+                                   struct messaging_dgm_context *ctx,
+                                   pid_t pid, struct messaging_dgm_out **pout)
+{
+       struct messaging_dgm_out *out;
+       struct sockaddr_un addr = { .sun_family = AF_UNIX };
+       int ret = ENOMEM;
+       int out_pathlen;
+
+       out = talloc(mem_ctx, struct messaging_dgm_out);
+       if (out == NULL) {
+               goto fail;
+       }
+
+       *out = (struct messaging_dgm_out) {
+               .pid = pid,
+               .ctx = ctx,
+               .cookie = 1
+       };
+
+       out_pathlen = snprintf(addr.sun_path, sizeof(addr.sun_path),
+                              "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
+       if (out_pathlen < 0) {
+               goto errno_fail;
+       }
+       if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
+               ret = ENAMETOOLONG;
+               goto fail;
+       }
+
+       out->queue = tevent_queue_create(out, addr.sun_path);
+       if (out->queue == NULL) {
+               ret = ENOMEM;
+               goto fail;
+       }
+
+       out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+       if (out->sock == -1) {
+               goto errno_fail;
+       }
+
+       DLIST_ADD(ctx->outsocks, out);
+       talloc_set_destructor(out, messaging_dgm_out_destructor);
+
+       do {
+               ret = connect(out->sock,
+                             (const struct sockaddr *)(const void *)&addr,
+                             sizeof(addr));
+       } while ((ret == -1) && (errno == EINTR));
+
+       if (ret == -1) {
+               goto errno_fail;
+       }
+
+       ret = set_blocking(out->sock, false);
+       if (ret == -1) {
+               goto errno_fail;
+       }
+       out->is_blocking = false;
+
+       *pout = out;
+       return 0;
+errno_fail:
+       ret = errno;
+fail:
+       TALLOC_FREE(out);
+       return ret;
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
+{
+       DLIST_REMOVE(out->ctx->outsocks, out);
+
+       if (tevent_queue_length(out->queue) != 0) {
+               /*
+                * We have pending jobs. We can't close the socket,
+                * this has been handed over to messaging_dgm_out_queue_state.
+                */
+               return 0;
+       }
+
+       if (out->sock != -1) {
+               close(out->sock);
+               out->sock = -1;
+       }
+       return 0;
+}
+
+static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
+                                struct messaging_dgm_out **pout)
+{
+       struct messaging_dgm_out *out;
+       int ret;
+
+       for (out = ctx->outsocks; out != NULL; out = out->next) {
+               if (out->pid == pid) {
+                       break;
+               }
+       }
+
+       if (out == NULL) {
+               ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
+               if (ret != 0) {
+                       return ret;
+               }
+       }
+
+       messaging_dgm_out_rearm_idle_timer(out);
+
+       *pout = out;
+       return 0;
+}
+
+static ssize_t messaging_dgm_sendmsg(int sock,
+                                    const struct iovec *iov, int iovlen,
+                                    const int *fds, size_t num_fds,
+                                    int *perrno)
+{
+       struct msghdr msg;
+       ssize_t fdlen, ret;
+
+       /*
+        * Do the actual sendmsg syscall. This will be called from a
+        * pthreadpool helper thread, so be careful what you do here.
+        */
+
+       msg = (struct msghdr) {
+               .msg_iov = discard_const_p(struct iovec, iov),
+               .msg_iovlen = iovlen
+       };
+
+       fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
+       if (fdlen == -1) {
+               *perrno = EINVAL;
+               return -1;
+       }
+
+       {
+               uint8_t buf[fdlen];
+
+               msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
+
+               do {
+                       ret = sendmsg(sock, &msg, MSG_NOSIGNAL);
+               } while ((ret == -1) && (errno == EINTR));
+       }
+
+       if (ret == -1) {
+               *perrno = errno;
+       }
+       return ret;
+}
+
+struct messaging_dgm_out_queue_state {
+       struct tevent_context *ev;
+       struct pthreadpool_tevent *pool;
+
+       struct tevent_req *req;
+       struct tevent_req *subreq;
+
+       int sock;
+
+       int *fds;
+       uint8_t *buf;
+
+       ssize_t sent;
+       int err;
+};
+
+static int messaging_dgm_out_queue_state_destructor(
+       struct messaging_dgm_out_queue_state *state);
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+                                          void *private_data);
+static void messaging_dgm_out_threaded_job(void *private_data);
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
+
+static struct tevent_req *messaging_dgm_out_queue_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct messaging_dgm_out *out,
+       const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+       struct tevent_req *req;
+       struct messaging_dgm_out_queue_state *state;
+       struct tevent_queue_entry *e;
+       size_t i;
+       ssize_t buflen;
+
+       req = tevent_req_create(out, &state,
+                               struct messaging_dgm_out_queue_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->pool = out->ctx->pool;
+       state->sock = out->sock;
+       state->req = req;
+
+       /*
+        * Go blocking in a thread
+        */
+       if (!out->is_blocking) {
+               int ret = set_blocking(out->sock, true);
+               if (ret == -1) {
+                       tevent_req_error(req, errno);
+                       return tevent_req_post(req, ev);
+               }
+               out->is_blocking = true;
+       }
+
+       buflen = iov_buflen(iov, iovlen);
+       if (buflen == -1) {
+               tevent_req_error(req, EMSGSIZE);
+               return tevent_req_post(req, ev);
+       }
+
+       state->buf = talloc_array(state, uint8_t, buflen);
+       if (tevent_req_nomem(state->buf, req)) {
+               return tevent_req_post(req, ev);
+       }
+       iov_buf(iov, iovlen, state->buf, buflen);
+
+       state->fds = talloc_array(state, int, num_fds);
+       if (tevent_req_nomem(state->fds, req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       for (i=0; i<num_fds; i++) {
+               state->fds[i] = -1;
+       }
+
+       for (i=0; i<num_fds; i++) {
+
+               state->fds[i] = dup(fds[i]);
+
+               if (state->fds[i] == -1) {
+                       int ret = errno;
+
+                       close_fd_array(state->fds, num_fds);
+
+                       tevent_req_error(req, ret);
+                       return tevent_req_post(req, ev);
+               }
+       }
+
+       talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
+
+       e = tevent_queue_add_entry(out->queue, ev, req,
+                                  messaging_dgm_out_queue_trigger, req);
+       if (tevent_req_nomem(e, req)) {
+               return tevent_req_post(req, ev);
+       }
+       return req;
+}
+
+static int messaging_dgm_out_queue_state_destructor(
+       struct messaging_dgm_out_queue_state *state)
+{
+       int *fds;
+       size_t num_fds;
+
+       if (state->subreq != NULL) {
+               /*
+                * We're scheduled, but we're destroyed. This happens
+                * if the messaging_dgm_context is destroyed while
+                * we're stuck in a blocking send. There's nothing we
+                * can do but to leak memory.
+                */
+               TALLOC_FREE(state->subreq);
+               (void)talloc_reparent(state->req, NULL, state);
+               return -1;
+       }
+
+       fds = state->fds;
+       num_fds = talloc_array_length(fds);
+       close_fd_array(fds, num_fds);
+       return 0;
+}
+
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+                                          void *private_data)
+{
+       struct messaging_dgm_out_queue_state *state = tevent_req_data(
+               req, struct messaging_dgm_out_queue_state);
+
+       state->subreq = pthreadpool_tevent_job_send(
+               state, state->ev, state->pool,
+               messaging_dgm_out_threaded_job, state);
+       if (tevent_req_nomem(state->subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
+                               req);
+}
+
+static void messaging_dgm_out_threaded_job(void *private_data)
+{
+       struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
+               private_data, struct messaging_dgm_out_queue_state);
+
+       struct iovec iov = { .iov_base = state->buf,
+                            .iov_len = talloc_get_size(state->buf) };
+       size_t num_fds = talloc_array_length(state->fds);
+
+       state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
+                                           state->fds, num_fds, &state->err);
+}
+
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct messaging_dgm_out_queue_state *state = tevent_req_data(
+               req, struct messaging_dgm_out_queue_state);
+       int ret;
+
+       if (subreq != state->subreq) {
+               abort();
+       }
+
+       ret = pthreadpool_tevent_job_recv(subreq);
+
+       TALLOC_FREE(subreq);
+       state->subreq = NULL;
+
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       if (state->sent == -1) {
+               tevent_req_error(req, state->err);
+               return;
+       }
+       tevent_req_done(req);
+}
+
+static int messaging_dgm_out_queue_recv(struct tevent_req *req)
+{
+       return tevent_req_simple_recv_unix(req);
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
+
+static int messaging_dgm_out_send_fragment(
+       struct tevent_context *ev, struct messaging_dgm_out *out,
+       const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+       struct tevent_req *req;
+       size_t qlen;
+
+       qlen = tevent_queue_length(out->queue);
+       if (qlen == 0) {
+               ssize_t nsent;
+               int err = 0;
+
+               if (out->is_blocking) {
+                       int ret = set_blocking(out->sock, false);
+                       if (ret == -1) {
+                               return errno;
+                       }
+                       out->is_blocking = false;
+               }
+
+               nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
+                                             num_fds, &err);
+               if (nsent >= 0) {
+                       return 0;
+               }
+
+               if (err != EWOULDBLOCK) {
+                       return err;
+               }
+       }
+
+       req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
+                                          fds, num_fds);
+       if (req == NULL) {
+               return ENOMEM;
+       }
+       tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
+
+       return 0;
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
+{
+       struct messaging_dgm_out *out = tevent_req_callback_data(
+               req, struct messaging_dgm_out);
+       int ret;
+
+       ret = messaging_dgm_out_queue_recv(req);
+       TALLOC_FREE(req);
+
+       if (ret != 0) {
+               DBG_WARNING("messaging_out_queue_recv returned %s\n",
+                           strerror(ret));
+       }
+
+       messaging_dgm_out_rearm_idle_timer(out);
+}
+
+
+struct messaging_dgm_fragment_hdr {
+       size_t msglen;
+       pid_t pid;
+       int sock;
+};
+
+static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
+                                            struct messaging_dgm_out *out,
+                                            const struct iovec *iov,
+                                            int iovlen,
+                                            const int *fds, size_t num_fds)
+{
+       ssize_t msglen, sent;
+       int ret = 0;
+       struct iovec iov_copy[iovlen+2];
+       struct messaging_dgm_fragment_hdr hdr;
+       struct iovec src_iov;
+
+       if (iovlen < 0) {
+               return EINVAL;
+       }
+
+       msglen = iov_buflen(iov, iovlen);
+       if (msglen == -1) {
+               return EMSGSIZE;
+       }
+       if (num_fds > INT8_MAX) {
+               return EINVAL;
+       }
+
+       if ((size_t) msglen <=
+           (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
+               uint64_t cookie = 0;
+
+               iov_copy[0].iov_base = &cookie;
+               iov_copy[0].iov_len = sizeof(cookie);
+               if (iovlen > 0) {
+                       memcpy(&iov_copy[1], iov,
+                              sizeof(struct iovec) * iovlen);
+               }
+
+               return messaging_dgm_out_send_fragment(
+                       ev, out, iov_copy, iovlen+1, fds, num_fds);
+
+       }
+
+       hdr = (struct messaging_dgm_fragment_hdr) {
+               .msglen = msglen,
+               .pid = getpid(),
+               .sock = out->sock
+       };
+
+       iov_copy[0].iov_base = &out->cookie;
+       iov_copy[0].iov_len = sizeof(out->cookie);
+       iov_copy[1].iov_base = &hdr;
+       iov_copy[1].iov_len = sizeof(hdr);
+
+       sent = 0;
+       src_iov = iov[0];
+
+       /*
+        * The following write loop sends the user message in pieces. We have
+        * filled the first two iovecs above with "cookie" and "hdr". In the
+        * following loops we pull message chunks from the user iov array and
+        * fill iov_copy piece by piece, possibly truncating chunks from the
+        * caller's iov array. Ugly, but hopefully efficient.
+        */
+
+       while (sent < msglen) {
+               size_t fragment_len;
+               size_t iov_index = 2;
+
+               fragment_len = sizeof(out->cookie) + sizeof(hdr);
+
+               while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
+                       size_t space, chunk;
+
+                       space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
+                       chunk = MIN(space, src_iov.iov_len);
+
+                       iov_copy[iov_index].iov_base = src_iov.iov_base;
+                       iov_copy[iov_index].iov_len = chunk;
+                       iov_index += 1;
+
+                       src_iov.iov_base = (char *)src_iov.iov_base + chunk;
+                       src_iov.iov_len -= chunk;
+                       fragment_len += chunk;
+
+                       if (src_iov.iov_len == 0) {
+                               iov += 1;
+                               iovlen -= 1;
+                               if (iovlen == 0) {
+                                       break;
+                               }
+                               src_iov = iov[0];
+                       }
+               }
+               sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
+
+               /*
+                * only the last fragment should pass the fd array.
+                * That simplifies the receiver a lot.
+                */
+               if (sent < msglen) {
+                       ret = messaging_dgm_out_send_fragment(
+                               ev, out, iov_copy, iov_index, NULL, 0);
+               } else {
+                       ret = messaging_dgm_out_send_fragment(
+                               ev, out, iov_copy, iov_index, fds, num_fds);
+               }
+               if (ret != 0) {
+                       break;
+               }
+       }
+
+       out->cookie += 1;
+       if (out->cookie == 0) {
+               out->cookie += 1;
+       }
+
+       return ret;
+}
+
+static struct messaging_dgm_context *global_dgm_context;
 
 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
 
@@ -168,6 +804,11 @@ fail_close:
        return ret;
 }
 
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+                                      struct tevent_fd *fde,
+                                      uint16_t flags,
+                                      void *private_data);
+
 int messaging_dgm_init(struct tevent_context *ev,
                       uint64_t *punique,
                       const char *socket_dir,
@@ -193,6 +834,7 @@ int messaging_dgm_init(struct tevent_context *ev,
        if (ctx == NULL) {
                goto fail_nomem;
        }
+       ctx->ev = ev;
        ctx->pid = getpid();
        ctx->recv_cb = recv_cb;
        ctx->recv_cb_private_data = recv_cb_private_data;
@@ -229,30 +871,52 @@ int messaging_dgm_init(struct tevent_context *ev,
                return ret;
        }
 
-       ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
-       if (ctx->msg_callbacks == NULL) {
-               goto fail_nomem;
-       }
+       unlink(socket_address.sun_path);
 
-       ctx->tevent_handle = poll_funcs_tevent_register(
-               ctx, ctx->msg_callbacks, ev);
-       if (ctx->tevent_handle == NULL) {
-               goto fail_nomem;
+       ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+       if (ctx->sock == -1) {
+               ret = errno;
+               DBG_WARNING("socket failed: %s\n", strerror(ret));
+               TALLOC_FREE(ctx);
+               return ret;
        }
 
-       unlink(socket_address.sun_path);
+       ret = prepare_socket_cloexec(ctx->sock);
+       if (ret == -1) {
+               ret = errno;
+               DBG_WARNING("prepare_socket_cloexec failed: %s\n",
+                           strerror(ret));
+               TALLOC_FREE(ctx);
+               return ret;
+       }
 
-       ret = unix_msg_init(&socket_address, ctx->msg_callbacks, 1024,
-                           messaging_dgm_recv, ctx, &ctx->dgm_ctx);
-       if (ret != 0) {
-               DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+       ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
+                  sizeof(socket_address));
+       if (ret == -1) {
+               ret = errno;
+               DBG_WARNING("bind failed: %s\n", strerror(ret));
                TALLOC_FREE(ctx);
                return ret;
        }
+
+       ctx->read_fde = tevent_add_fd(ctx->ev, ctx, ctx->sock, TEVENT_FD_READ,
+                                     messaging_dgm_read_handler, ctx);
+       if (ctx->read_fde == NULL) {
+               goto fail_nomem;
+       }
+
        talloc_set_destructor(ctx, messaging_dgm_context_destructor);
 
        ctx->have_dgm_context = &have_dgm_context;
 
+       ret = pthreadpool_tevent_init(ctx, 0, &ctx->pool);
+       if (ret != 0) {
+               DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
+                           strerror(ret));
+               TALLOC_FREE(ctx);
+               return ret;
+       }
+
        global_dgm_context = ctx;
        return 0;
 
@@ -263,16 +927,31 @@ fail_nomem:
 
 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
 {
-       /*
-        * First delete the socket to avoid races. The lockfile is the
-        * indicator that we're still around.
-        */
-       unix_msg_free(c->dgm_ctx);
+       while (c->outsocks != NULL) {
+               TALLOC_FREE(c->outsocks);
+       }
+       while (c->in_msgs != NULL) {
+               TALLOC_FREE(c->in_msgs);
+       }
+
+       TALLOC_FREE(c->read_fde);
+       close(c->sock);
 
        if (getpid() == c->pid) {
                struct sun_path_buf name;
                int ret;
 
+               ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
+                              c->socket_dir.buf, (unsigned)c->pid);
+               if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
+                       /*
+                        * We've checked the length when creating, so this
+                        * should never happen
+                        */
+                       abort();
+               }
+               unlink(name.buf);
+
                ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
                               c->lockfile_dir.buf, (unsigned)c->pid);
                if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
@@ -293,6 +972,174 @@ static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
        return 0;
 }
 
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+                              uint8_t *msg, size_t msg_len,
+                              int *fds, size_t num_fds);
+
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+                                      struct tevent_fd *fde,
+                                      uint16_t flags,
+                                      void *private_data)
+{
+       struct messaging_dgm_context *ctx = talloc_get_type_abort(
+               private_data, struct messaging_dgm_context);
+       ssize_t received;
+       struct msghdr msg;
+       struct iovec iov;
+       size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
+       uint8_t msgbuf[msgbufsize];
+       uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
+
+       if ((flags & TEVENT_FD_READ) == 0) {
+               return;
+       }
+
+       iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
+       msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
+
+       msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
+
+#ifdef MSG_CMSG_CLOEXEC
+       flags |= MSG_CMSG_CLOEXEC;
+#endif
+
+       received = recvmsg(ctx->sock, &msg, 0);
+       if (received == -1) {
+               if ((errno == EAGAIN) ||
+                   (errno == EWOULDBLOCK) ||
+                   (errno == EINTR) ||
+                   (errno == ENOMEM)) {
+                       /* Not really an error - just try again. */
+                       return;
+               }
+               /* Problem with the socket. Set it unreadable. */
+               tevent_fd_set_flags(ctx->read_fde, 0);
+               return;
+       }
+
+       if ((size_t)received > sizeof(buf)) {
+               /* More than we expected, not for us */
+               return;
+       }
+
+       {
+               size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
+               size_t i;
+               int fds[num_fds];
+
+               msghdr_extract_fds(&msg, fds, num_fds);
+
+               for (i = 0; i < num_fds; i++) {
+                       int err;
+
+                       err = prepare_socket_cloexec(fds[i]);
+                       if (err != 0) {
+                               close_fd_array(fds, num_fds);
+                               num_fds = 0;
+                       }
+               }
+
+               messaging_dgm_recv(ctx, buf, received, fds, num_fds);
+       }
+
+}
+
+static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
+{
+       DLIST_REMOVE(m->ctx->in_msgs, m);
+       return 0;
+}
+
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+                              uint8_t *buf, size_t buflen,
+                              int *fds, size_t num_fds)
+{
+       struct messaging_dgm_fragment_hdr hdr;
+       struct messaging_dgm_in_msg *msg;
+       size_t space;
+       uint64_t cookie;
+
+       if (buflen < sizeof(cookie)) {
+               goto close_fds;
+       }
+       memcpy(&cookie, buf, sizeof(cookie));
+       buf += sizeof(cookie);
+       buflen -= sizeof(cookie);
+
+       if (cookie == 0) {
+               ctx->recv_cb(buf, buflen, fds, num_fds,
+                            ctx->recv_cb_private_data);
+               return;
+       }
+
+       if (buflen < sizeof(hdr)) {
+               goto close_fds;
+       }
+       memcpy(&hdr, buf, sizeof(hdr));
+       buf += sizeof(hdr);
+       buflen -= sizeof(hdr);
+
+       for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
+               if ((msg->sender_pid == hdr.pid) &&
+                   (msg->sender_sock == hdr.sock)) {
+                       break;
+               }
+       }
+
+       if ((msg != NULL) && (msg->cookie != cookie)) {
+               TALLOC_FREE(msg);
+       }
+
+       if (msg == NULL) {
+               size_t msglen;
+               msglen = offsetof(struct messaging_dgm_in_msg, buf) +
+                       hdr.msglen;
+
+               msg = talloc_size(ctx, msglen);
+               if (msg == NULL) {
+                       goto close_fds;
+               }
+               talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
+
+               *msg = (struct messaging_dgm_in_msg) {
+                       .ctx = ctx, .msglen = hdr.msglen,
+                       .sender_pid = hdr.pid, .sender_sock = hdr.sock,
+                       .cookie = cookie
+               };
+               DLIST_ADD(ctx->in_msgs, msg);
+               talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
+       }
+
+       space = msg->msglen - msg->received;
+       if (buflen > space) {
+               goto close_fds;
+       }
+
+       memcpy(msg->buf + msg->received, buf, buflen);
+       msg->received += buflen;
+
+       if (msg->received < msg->msglen) {
+               /*
+                * Any valid sender will send the fds in the last
+                * block. Invalid senders might have sent fd's that we
+                * need to close here.
+                */
+               goto close_fds;
+       }
+
+       DLIST_REMOVE(ctx->in_msgs, msg);
+       talloc_set_destructor(msg, NULL);
+
+       ctx->recv_cb(msg->buf, msg->msglen, fds, num_fds,
+                    ctx->recv_cb_private_data);
+
+       TALLOC_FREE(msg);
+       return;
+
+close_fds:
+       close_fd_array(fds, num_fds);
+}
+
 void messaging_dgm_destroy(void)
 {
        TALLOC_FREE(global_dgm_context);
@@ -303,44 +1150,25 @@ int messaging_dgm_send(pid_t pid,
                       const int *fds, size_t num_fds)
 {
        struct messaging_dgm_context *ctx = global_dgm_context;
-       struct sockaddr_un dst;
-       ssize_t dst_pathlen;
+       struct messaging_dgm_out *out;
        int ret;
 
        if (ctx == NULL) {
                return ENOTCONN;
        }
 
-       dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
-
-       dst_pathlen = snprintf(dst.sun_path, sizeof(dst.sun_path),
-                              "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
-       if (dst_pathlen < 0) {
-               return errno;
-       }
-       if ((size_t)dst_pathlen >= sizeof(dst.sun_path)) {
-               return ENAMETOOLONG;
+       ret = messaging_dgm_out_get(ctx, pid, &out);
+       if (ret != 0) {
+               return ret;
        }
 
        DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
 
-       ret = unix_msg_send(ctx->dgm_ctx, &dst, iov, iovlen, fds, num_fds);
-
+       ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
+                                               fds, num_fds);
        return ret;
 }
 
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
-                              uint8_t *msg, size_t msg_len,
-                              int *fds, size_t num_fds,
-                              void *private_data)
-{
-       struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
-               private_data, struct messaging_dgm_context);
-
-       dgm_ctx->recv_cb(msg, msg_len, fds, num_fds,
-                        dgm_ctx->recv_cb_private_data);
-}
-
 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
 {
        char buf[25];
@@ -525,5 +1353,6 @@ void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
        if (ctx == NULL) {
                return NULL;
        }
-       return poll_funcs_tevent_register(mem_ctx, ctx->msg_callbacks, ev);
+       return tevent_add_fd(ev, mem_ctx, ctx->sock, TEVENT_FD_READ,
+                            messaging_dgm_read_handler, ctx);
 }
index 21a76d3017e110bba6a1d78de99378e827d0f7f7..1598555353608e4a75c85946aeae75adffce17d5 100755 (executable)
@@ -302,8 +302,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 
 bld.SAMBA3_LIBRARY('messages_dgm',
                    source='''lib/messages_dgm.c lib/messages_dgm_ref.c''',
-                   deps='''talloc UNIX_MSG POLL_FUNCS_TEVENT samba-debug
-                           genrand''',
+                   deps='''talloc samba-debug PTHREADPOOL msghdr genrand''',
                    private_library=True)
 
 bld.SAMBA3_LIBRARY('messages_util',
@@ -355,7 +354,6 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         UTIL_PW
                         SAMBA_VERSION
                         PTHREADPOOL
-                        UNIX_MSG
                         POLL_FUNCS_TEVENT
                         interfaces
                         param
@@ -1459,7 +1457,6 @@ bld.SAMBA3_BINARY('spotlight2sparql',
 bld.RECURSE('auth')
 bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
-bld.RECURSE('lib/unix_msg')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')