source4 messaging: clean up terminated processes
[vlendec/samba-autobuild/.git] / source4 / lib / messaging / messaging.c
index 66188971f30fe19d93aa121114c69fcd6fb0dd99..413a19445eb9e019ec5a65be1d0c6946f3c4e3d1 100644 (file)
@@ -1,40 +1,45 @@
-/* 
+/*
    Unix SMB/CIFS implementation.
 
    Samba internal messaging functions
 
    Copyright (C) Andrew Tridgell 2004
-   
+
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
-   
+
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
-   
+
    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
 #include "lib/events/events.h"
+#include "lib/util/server_id.h"
 #include "system/filesys.h"
 #include "messaging/messaging.h"
+#include "messaging/messaging_internal.h"
 #include "../lib/util/dlinklist.h"
 #include "lib/socket/socket.h"
 #include "librpc/gen_ndr/ndr_irpc.h"
 #include "lib/messaging/irpc.h"
-#include "lib/tdb_wrap/tdb_wrap.h"
 #include "../lib/util/unix_privs.h"
 #include "librpc/rpc/dcerpc.h"
-#include "../lib/tdb_compat/tdb_compat.h"
-#include "../lib/util/util_tdb.h"
 #include "cluster/cluster.h"
 #include "../lib/util/tevent_ntstatus.h"
 #include "lib/param/param.h"
+#include "lib/util/server_id_db.h"
+#include "lib/util/talloc_report.h"
+#include "../source3/lib/messages_dgm.h"
+#include "../source3/lib/messages_dgm_ref.h"
+#include "../source3/lib/messages_util.h"
+#include <tdb.h>
 
 /* change the message version with any incompatible changes in the protocol */
 #define IMESSAGING_VERSION 1
@@ -51,28 +56,6 @@ struct irpc_request {
        } incoming;
 };
 
-struct imessaging_context {
-       struct server_id server_id;
-       struct socket_context *sock;
-       const char *base_path;
-       const char *path;
-       struct loadparm_context *lp_ctx;
-       struct dispatch_fn **dispatch;
-       uint32_t num_types;
-       struct idr_context *dispatch_tree;
-       struct imessaging_rec *pending;
-       struct imessaging_rec *retry_queue;
-       struct irpc_list *irpc;
-       struct idr_context *idr;
-       const char **names;
-       struct timeval start_time;
-       struct tevent_timer *retry_te;
-       struct {
-               struct tevent_context *ev;
-               struct tevent_fd *fde;
-       } event;
-};
-
 /* we have a linked list of dispatch handlers for each msg_type that
    this messaging server can deal with */
 struct dispatch_fn {
@@ -83,23 +66,6 @@ struct dispatch_fn {
 };
 
 /* an individual message */
-struct imessaging_rec {
-       struct imessaging_rec *next, *prev;
-       struct imessaging_context *msg;
-       const char *path;
-
-       struct imessaging_header {
-               uint32_t version;
-               uint32_t msg_type;
-               struct server_id from;
-               struct server_id to;
-               uint32_t length;
-       } *header;
-
-       DATA_BLOB packet;
-       uint32_t retries;
-};
-
 
 static void irpc_handler(struct imessaging_context *, void *,
                         uint32_t, struct server_id, DATA_BLOB *);
@@ -111,272 +77,75 @@ static void irpc_handler(struct imessaging_context *, void *,
 static void ping_message(struct imessaging_context *msg, void *private_data,
                         uint32_t msg_type, struct server_id src, DATA_BLOB *data)
 {
-       char *task_id = server_id_str(NULL, &src);
+       struct server_id_buf idbuf;
        DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n",
-                task_id, (int)data->length,
+                server_id_str_buf(src, &idbuf), (int)data->length,
                 data->data?(const char *)data->data:""));
-       talloc_free(task_id);
        imessaging_send(msg, src, MSG_PONG, data);
 }
 
-/*
-  return uptime of messaging server via irpc
-*/
-static NTSTATUS irpc_uptime(struct irpc_message *msg, 
-                           struct irpc_uptime *r)
-{
-       struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
-       *r->out.start_time = timeval_to_nttime(&ctx->start_time);
-       return NT_STATUS_OK;
-}
-
-/* 
-   return the path to a messaging socket
-*/
-static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id)
-{
-       TALLOC_CTX *tmp_ctx = talloc_new(msg);
-       const char *id = server_id_str(tmp_ctx, &server_id);
-       char *s;
-       if (id == NULL) {
-               return NULL;
-       }
-       s = talloc_asprintf(msg, "%s/msg.%s", msg->base_path, id);
-       talloc_steal(s, tmp_ctx);
-       return s;
-}
-
-/*
-  dispatch a fully received message
-
-  note that this deliberately can match more than one message handler
-  per message. That allows a single messasging context to register
-  (for example) a debug handler for more than one piece of code
-*/
-static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec)
+static void pool_message(struct imessaging_context *msg, void *private_data,
+                        uint32_t msg_type, struct server_id src,
+                        DATA_BLOB *data)
 {
-       struct dispatch_fn *d, *next;
+       char *report;
 
-       /* temporary IDs use an idtree, the rest use a array of pointers */
-       if (rec->header->msg_type >= MSG_TMP_BASE) {
-               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
-                                                  rec->header->msg_type);
-       } else if (rec->header->msg_type < msg->num_types) {
-               d = msg->dispatch[rec->header->msg_type];
-       } else {
-               d = NULL;
-       }
+       report = talloc_report_str(msg, NULL);
 
-       for (; d; d = next) {
-               DATA_BLOB data;
-               next = d->next;
-               data.data = rec->packet.data + sizeof(*rec->header);
-               data.length = rec->header->length;
-               d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
+       if (report != NULL) {
+               DATA_BLOB blob = { .data = (uint8_t *)report,
+                                  .length = talloc_get_size(report) - 1};
+               imessaging_send(msg, src, MSG_POOL_USAGE, &blob);
        }
-       rec->header->length = 0;
+       talloc_free(report);
 }
 
-/*
-  handler for messages that arrive from other nodes in the cluster
-*/
-static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet)
-{
-       struct imessaging_rec *rec;
-
-       rec = talloc(msg, struct imessaging_rec);
-       if (rec == NULL) {
-               smb_panic("Unable to allocate imessaging_rec");
-       }
-
-       rec->msg           = msg;
-       rec->path          = msg->path;
-       rec->header        = (struct imessaging_header *)packet.data;
-       rec->packet        = packet;
-       rec->retries       = 0;
-
-       if (packet.length != sizeof(*rec->header) + rec->header->length) {
-               DEBUG(0,("messaging: bad message header size %d should be %d\n", 
-                        rec->header->length, (int)(packet.length - sizeof(*rec->header))));
-               talloc_free(rec);
-               return;
-       }
-
-       imessaging_dispatch(msg, rec);
-       talloc_free(rec);
-}
-
-
-
-/*
-  try to send the message
-*/
-static NTSTATUS try_send(struct imessaging_rec *rec)
+static void ringbuf_log_msg(struct imessaging_context *msg,
+                           void *private_data,
+                           uint32_t msg_type,
+                           struct server_id src,
+                           DATA_BLOB *data)
 {
-       struct imessaging_context *msg = rec->msg;
-       size_t nsent;
-       void *priv;
-       NTSTATUS status;
-       struct socket_address *path;
+       char *log = debug_get_ringbuf();
+       size_t logsize = debug_get_ringbuf_size();
+       DATA_BLOB blob;
 
-       /* rec->path is the path of the *other* socket, where we want
-        * this to end up */
-       path = socket_address_from_strings(msg, msg->sock->backend_name, 
-                                          rec->path, 0);
-       if (!path) {
-               return NT_STATUS_NO_MEMORY;
+       if (log == NULL) {
+               log = discard_const_p(char, "*disabled*\n");
+               logsize = strlen(log) + 1;
        }
 
-       /* we send with privileges so messages work from any context */
-       priv = root_privileges();
-       status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
-       talloc_free(path);
-       talloc_free(priv);
+       blob.data = (uint8_t *)log;
+       blob.length = logsize;
 
-       return status;
+       imessaging_send(msg, src, MSG_RINGBUF_LOG, &blob);
 }
 
 /*
-  retry backed off messages
-*/
-static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, 
-                           struct timeval t, void *private_data)
-{
-       struct imessaging_context *msg = talloc_get_type(private_data,
-                                                       struct imessaging_context);
-       msg->retry_te = NULL;
-
-       /* put the messages back on the main queue */
-       while (msg->retry_queue) {
-               struct imessaging_rec *rec = msg->retry_queue;
-               DLIST_REMOVE(msg->retry_queue, rec);
-               DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
-       }
-
-       TEVENT_FD_WRITEABLE(msg->event.fde);
-}
-
-/*
-  handle a socket write event
-*/
-static void imessaging_send_handler(struct imessaging_context *msg)
-{
-       while (msg->pending) {
-               struct imessaging_rec *rec = msg->pending;
-               NTSTATUS status;
-               status = try_send(rec);
-               if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
-                       rec->retries++;
-                       if (rec->retries > 3) {
-                               /* we're getting continuous write errors -
-                                  backoff this record */
-                               DLIST_REMOVE(msg->pending, rec);
-                               DLIST_ADD_END(msg->retry_queue, rec, 
-                                             struct imessaging_rec *);
-                               if (msg->retry_te == NULL) {
-                                       msg->retry_te = 
-                                               tevent_add_timer(msg->event.ev, msg,
-                                                               timeval_current_ofs(1, 0), 
-                                                               msg_retry_timer, msg);
-                               }
-                       }
-                       break;
-               }
-               rec->retries = 0;
-               if (!NT_STATUS_IS_OK(status)) {
-                       TALLOC_CTX *tmp_ctx = talloc_new(msg);
-                       DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 
-                                server_id_str(tmp_ctx, &rec->header->from),
-                                server_id_str(tmp_ctx, &rec->header->to),
-                                rec->header->msg_type, 
-                                nt_errstr(status)));
-                       talloc_free(tmp_ctx);
-               }
-               DLIST_REMOVE(msg->pending, rec);
-               talloc_free(rec);
-       }
-       if (msg->pending == NULL) {
-               TEVENT_FD_NOT_WRITEABLE(msg->event.fde);
-       }
-}
-
-/*
-  handle a new incoming packet
+  return uptime of messaging server via irpc
 */
-static void imessaging_recv_handler(struct imessaging_context *msg)
+static NTSTATUS irpc_uptime(struct irpc_message *msg,
+                           struct irpc_uptime *r)
 {
-       struct imessaging_rec *rec;
-       NTSTATUS status;
-       DATA_BLOB packet;
-       size_t msize;
-
-       /* see how many bytes are in the next packet */
-       status = socket_pending(msg->sock, &msize);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0,("socket_pending failed in messaging - %s\n", 
-                        nt_errstr(status)));
-               return;
-       }
-       
-       packet = data_blob_talloc(msg, NULL, msize);
-       if (packet.data == NULL) {
-               /* assume this is temporary and retry */
-               return;
-       }
-           
-       status = socket_recv(msg->sock, packet.data, msize, &msize);
-       if (!NT_STATUS_IS_OK(status)) {
-               data_blob_free(&packet);
-               return;
-       }
-
-       if (msize < sizeof(*rec->header)) {
-               DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
-               data_blob_free(&packet);
-               return;
-       }
-
-       rec = talloc(msg, struct imessaging_rec);
-       if (rec == NULL) {
-               smb_panic("Unable to allocate imessaging_rec");
-       }
-
-       talloc_steal(rec, packet.data);
-       rec->msg           = msg;
-       rec->path          = msg->path;
-       rec->header        = (struct imessaging_header *)packet.data;
-       rec->packet        = packet;
-       rec->retries       = 0;
-
-       if (msize != sizeof(*rec->header) + rec->header->length) {
-               DEBUG(0,("messaging: bad message header size %d should be %d\n", 
-                        rec->header->length, (int)(msize - sizeof(*rec->header))));
-               talloc_free(rec);
-               return;
-       }
-
-       imessaging_dispatch(msg, rec);
-       talloc_free(rec);
+       struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
+       *r->out.start_time = timeval_to_nttime(&ctx->start_time);
+       return NT_STATUS_OK;
 }
 
-
-/*
-  handle a socket event
-*/
-static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
-                             uint16_t flags, void *private_data)
+static struct dispatch_fn *imessaging_find_dispatch(
+       struct imessaging_context *msg, uint32_t msg_type)
 {
-       struct imessaging_context *msg = talloc_get_type(private_data,
-                                                       struct imessaging_context);
-       if (flags & TEVENT_FD_WRITE) {
-               imessaging_send_handler(msg);
+       /* temporary IDs use an idtree, the rest use a array of pointers */
+       if (msg_type >= MSG_TMP_BASE) {
+               return (struct dispatch_fn *)idr_find(msg->dispatch_tree,
+                                                     msg_type);
        }
-       if (flags & TEVENT_FD_READ) {
-               imessaging_recv_handler(msg);
+       if (msg_type < msg->num_types) {
+               return msg->dispatch[msg_type];
        }
+       return NULL;
 }
 
-
 /*
   Register a dispatch function for a particular message type.
 */
@@ -388,7 +157,7 @@ NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data,
        /* possibly expand dispatch array */
        if (msg_type >= msg->num_types) {
                struct dispatch_fn **dp;
-               int i;
+               uint32_t i;
                dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
                NT_STATUS_HAVE_NO_MEMORY(dp);
                msg->dispatch = dp;
@@ -444,7 +213,7 @@ void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, vo
        struct dispatch_fn *d, *next;
 
        if (msg_type >= msg->num_types) {
-               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
+               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
                                                   msg_type);
                if (!d) return;
                idr_remove(msg->dispatch_tree, msg_type);
@@ -462,120 +231,147 @@ void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, vo
 }
 
 /*
-  Send a message to a particular server
 */
-NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server,
-                       uint32_t msg_type, const DATA_BLOB *data)
+int imessaging_cleanup(struct imessaging_context *msg)
 {
-       struct imessaging_rec *rec;
-       NTSTATUS status;
-       size_t dlength = data?data->length:0;
-
-       rec = talloc(msg, struct imessaging_rec);
-       if (rec == NULL) {
-               return NT_STATUS_NO_MEMORY;
+       if (!msg) {
+               return 0;
        }
+       return 0;
+}
+
+static void imessaging_dgm_recv(struct tevent_context *ev,
+                               const uint8_t *buf, size_t buf_len,
+                               int *fds, size_t num_fds,
+                               void *private_data);
+
+/* Keep a list of imessaging contexts */
+static struct imessaging_context *msg_ctxs;
 
-       rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
-       if (rec->packet.data == NULL) {
-               talloc_free(rec);
+/*
+ * A process has terminated, clean-up any names it has registered.
+ */
+NTSTATUS imessaging_process_cleanup(
+       struct imessaging_context *msg_ctx,
+       pid_t pid)
+{
+       struct irpc_name_records *names = NULL;
+       int i = 0;
+       int j = 0;
+       TALLOC_CTX *mem_ctx = talloc_new(NULL);
+
+       if (mem_ctx == NULL) {
+               DBG_ERR("OOM unable to clean up messaging for process (%d)\n",
+                       pid);
                return NT_STATUS_NO_MEMORY;
        }
 
-       rec->retries       = 0;
-       rec->msg              = msg;
-       rec->header           = (struct imessaging_header *)rec->packet.data;
-       /* zero padding */
-       ZERO_STRUCTP(rec->header);
-       rec->header->version  = IMESSAGING_VERSION;
-       rec->header->msg_type = msg_type;
-       rec->header->from     = msg->server_id;
-       rec->header->to       = server;
-       rec->header->length   = dlength;
-       if (dlength != 0) {
-               memcpy(rec->packet.data + sizeof(*rec->header), 
-                      data->data, dlength);
-       }
-
-       if (!cluster_node_equal(&msg->server_id, &server)) {
-               /* the destination is on another node - dispatch via
-                  the cluster layer */
-               status = cluster_message_send(server, &rec->packet);
-               talloc_free(rec);
-               return status;
+       names = irpc_all_servers(msg_ctx, mem_ctx);
+       if (names == NULL) {
+               TALLOC_FREE(mem_ctx);
+               return NT_STATUS_OK;
+       }
+       for (i = 0; i < names->num_records; i++) {
+               for (j = 0; j < names->names[i]->count; j++) {
+                       if (names->names[i]->ids[j].pid == pid) {
+                               int ret = server_id_db_prune_name(
+                                       msg_ctx->names,
+                                       names->names[i]->name,
+                                       names->names[i]->ids[j]);
+                               if (ret != 0 && ret != ENOENT) {
+                                       TALLOC_FREE(mem_ctx);
+                                       return map_nt_error_from_unix_common(
+                                           ret);
+                               }
+                       }
+               }
        }
+       TALLOC_FREE(mem_ctx);
+       return NT_STATUS_OK;
+}
 
-       rec->path = imessaging_path(msg, server);
-       talloc_steal(rec, rec->path);
+static int imessaging_context_destructor(struct imessaging_context *msg)
+{
+       DLIST_REMOVE(msg_ctxs, msg);
+       TALLOC_FREE(msg->msg_dgm_ref);
+       return 0;
+}
 
-       if (msg->pending != NULL) {
-               status = STATUS_MORE_ENTRIES;
-       } else {
-               status = try_send(rec);
-       }
+/*
+ * Cleanup messaging dgm contexts on a specific event context.
+ *
+ * We must make sure to unref all messaging_dgm_ref's *before* the
+ * tevent context goes away. Only when the last ref is freed, the
+ * refcounted messaging dgm context will be freed.
+ */
+void imessaging_dgm_unref_ev(struct tevent_context *ev)
+{
+       struct imessaging_context *msg = NULL;
 
-       if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
-               if (msg->pending == NULL) {
-                       TEVENT_FD_WRITEABLE(msg->event.fde);
+       for (msg = msg_ctxs; msg != NULL; msg = msg->next) {
+               if (msg->ev == ev) {
+                       TALLOC_FREE(msg->msg_dgm_ref);
                }
-               DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
-               return NT_STATUS_OK;
        }
-
-       talloc_free(rec);
-
-       return status;
 }
 
-/*
-  Send a message to a particular server, with the message containing a single pointer
-*/
-NTSTATUS imessaging_send_ptr(struct imessaging_context *msg, struct server_id server,
-                           uint32_t msg_type, void *ptr)
+static NTSTATUS imessaging_reinit(struct imessaging_context *msg)
 {
-       DATA_BLOB blob;
+       int ret = -1;
 
-       blob.data = (uint8_t *)&ptr;
-       blob.length = sizeof(void *);
+       TALLOC_FREE(msg->msg_dgm_ref);
 
-       return imessaging_send(msg, server, msg_type, &blob);
-}
+       msg->server_id.pid = getpid();
 
+       msg->msg_dgm_ref = messaging_dgm_ref(msg,
+                               msg->ev,
+                               &msg->server_id.unique_id,
+                               msg->sock_dir,
+                               msg->lock_dir,
+                               imessaging_dgm_recv,
+                               msg,
+                               &ret);
+
+       if (msg->msg_dgm_ref == NULL) {
+               DEBUG(2, ("messaging_dgm_ref failed: %s\n",
+                       strerror(ret)));
+               return map_nt_error_from_unix_common(ret);
+       }
+
+       server_id_db_reinit(msg->names, msg->server_id);
+       return NT_STATUS_OK;
+}
 
 /*
-  remove our messaging socket and database entry
-*/
-int imessaging_cleanup(struct imessaging_context *msg)
+ * Must be called after a fork.
+ */
+NTSTATUS imessaging_reinit_all(void)
 {
-       if (!msg) {
-               return 0;
-       }
+       struct imessaging_context *msg = NULL;
 
-       DEBUG(5,("imessaging: cleaning up %s\n", msg->path));
-       unlink(msg->path);
-       while (msg->names && msg->names[0]) {
-               irpc_remove_name(msg, msg->names[0]);
+       for (msg = msg_ctxs; msg != NULL; msg = msg->next) {
+               NTSTATUS status = imessaging_reinit(msg);
+               if (!NT_STATUS_IS_OK(status)) {
+                       return status;
+               }
        }
-       return 0;
+       return NT_STATUS_OK;
 }
 
 /*
   create the listening socket and setup the dispatcher
-
-  use temporary=true when you want a destructor to remove the
-  associated messaging socket and database entry on talloc free. Don't
-  use this in processes that may fork and a child may talloc free this
-  memory
 */
-struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
+static struct imessaging_context *imessaging_init_internal(TALLOC_CTX *mem_ctx,
                                           struct loadparm_context *lp_ctx,
                                           struct server_id server_id,
-                                          struct tevent_context *ev,
-                                          bool auto_remove)
+                                          struct tevent_context *ev)
 {
-       struct imessaging_context *msg;
        NTSTATUS status;
-       struct socket_address *path;
+       struct imessaging_context *msg;
+       bool ok;
+       int ret;
+       const char *lock_dir = NULL;
+       int tdb_flags = TDB_INCOMPATIBLE_HASH | TDB_CLEAR_IF_FIRST;
 
        if (ev == NULL) {
                return NULL;
@@ -585,77 +381,266 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
        if (msg == NULL) {
                return NULL;
        }
+       msg->ev = ev;
 
-       /* setup a handler for messages from other cluster nodes, if appropriate */
-       status = cluster_message_init(msg, server_id, cluster_message_handler);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(msg);
-               return NULL;
-       }
+       talloc_set_destructor(msg, imessaging_context_destructor);
 
        /* create the messaging directory if needed */
 
-       msg->lp_ctx = talloc_reference(msg, lp_ctx);
-       if (!msg->lp_ctx) {
-               talloc_free(msg);
-               return NULL;
+       lock_dir = lpcfg_lock_directory(lp_ctx);
+       if (lock_dir == NULL) {
+               goto fail;
+       }
+
+       msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "msg.sock");
+       if (msg->sock_dir == NULL) {
+               goto fail;
+       }
+       ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700);
+       if (!ok) {
+               goto fail;
        }
 
-       msg->base_path     = lpcfg_imessaging_path(msg, lp_ctx);
+       msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg.lock");
+       if (msg->lock_dir == NULL) {
+               goto fail;
+       }
+       ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755);
+       if (!ok) {
+               goto fail;
+       }
 
-       mkdir(msg->base_path, 0700);
+       msg->msg_dgm_ref = messaging_dgm_ref(
+               msg, ev, &server_id.unique_id, msg->sock_dir, msg->lock_dir,
+               imessaging_dgm_recv, msg, &ret);
+
+       if (msg->msg_dgm_ref == NULL) {
+               goto fail;
+       }
 
-       msg->path          = imessaging_path(msg, server_id);
        msg->server_id     = server_id;
        msg->idr           = idr_init(msg);
+       if (msg->idr == NULL) {
+               goto fail;
+       }
+
        msg->dispatch_tree = idr_init(msg);
+       if (msg->dispatch_tree == NULL) {
+               goto fail;
+       }
+
        msg->start_time    = timeval_current();
 
-       status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
+       tdb_flags |= lpcfg_tdb_flags(lp_ctx, 0);
+
+       /*
+        * This context holds a destructor that cleans up any names
+        * registered on this context on talloc_free()
+        */
+       msg->names = server_id_db_init(msg, server_id, lock_dir, 0, tdb_flags);
+       if (msg->names == NULL) {
+               goto fail;
+       }
+
+       status = imessaging_register(msg, NULL, MSG_PING, ping_message);
        if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(msg);
-               return NULL;
+               goto fail;
+       }
+       status = imessaging_register(msg, NULL, MSG_REQ_POOL_USAGE,
+                                    pool_message);
+       if (!NT_STATUS_IS_OK(status)) {
+               goto fail;
+       }
+       status = imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
+       if (!NT_STATUS_IS_OK(status)) {
+               goto fail;
+       }
+       status = imessaging_register(msg, NULL, MSG_REQ_RINGBUF_LOG,
+                                    ringbuf_log_msg);
+       if (!NT_STATUS_IS_OK(status)) {
+               goto fail;
+       }
+       status = IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
+       if (!NT_STATUS_IS_OK(status)) {
+               goto fail;
        }
 
-       /* by stealing here we ensure that the socket is cleaned up (and even 
-          deleted) on exit */
-       talloc_steal(msg, msg->sock);
+       DLIST_ADD(msg_ctxs, msg);
 
-       path = socket_address_from_strings(msg, msg->sock->backend_name, 
-                                          msg->path, 0);
-       if (!path) {
-               talloc_free(msg);
-               return NULL;
+       return msg;
+fail:
+       talloc_free(msg);
+       return NULL;
+}
+
+struct imessaging_post_state {
+       struct imessaging_context *msg_ctx;
+       struct imessaging_post_state **busy_ref;
+       size_t buf_len;
+       uint8_t buf[];
+};
+
+static int imessaging_post_state_destructor(struct imessaging_post_state *state)
+{
+       if (state->busy_ref != NULL) {
+               *state->busy_ref = NULL;
+               state->busy_ref = NULL;
        }
+       return 0;
+}
 
-       status = socket_listen(msg->sock, path, 50, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
-               talloc_free(msg);
-               return NULL;
+static void imessaging_post_handler(struct tevent_context *ev,
+                                   struct tevent_immediate *ti,
+                                   void *private_data)
+{
+       struct imessaging_post_state *state = talloc_get_type_abort(
+               private_data, struct imessaging_post_state);
+
+       /*
+        * In usecases like using messaging_client_init() with irpc processing
+        * we may free the imessaging_context during the messaging handler.
+        * imessaging_post_state is a child of imessaging_context and
+        * might be implicitly free'ed before the explicit TALLOC_FREE(state).
+        *
+        * The busy_ref pointer makes sure the destructor clears
+        * the local 'state' variable.
+        */
+
+       SMB_ASSERT(state->busy_ref == NULL);
+       state->busy_ref = &state;
+
+       imessaging_dgm_recv(ev, state->buf, state->buf_len, NULL, 0,
+                           state->msg_ctx);
+
+       if (state == NULL) {
+               return;
        }
 
-       /* it needs to be non blocking for sends */
-       set_blocking(socket_get_fd(msg->sock), false);
+       state->busy_ref = NULL;
+       TALLOC_FREE(state);
+}
+
+static int imessaging_post_self(struct imessaging_context *msg,
+                               const uint8_t *buf, size_t buf_len)
+{
+       struct tevent_immediate *ti;
+       struct imessaging_post_state *state;
+
+       state = talloc_size(
+               msg, offsetof(struct imessaging_post_state, buf) + buf_len);
+       if (state == NULL) {
+               return ENOMEM;
+       }
+       talloc_set_name_const(state, "struct imessaging_post_state");
 
-       msg->event.ev   = ev;
-       msg->event.fde  = tevent_add_fd(ev, msg, socket_get_fd(msg->sock),
-                                       TEVENT_FD_READ, imessaging_handler, msg);
-       tevent_fd_set_auto_close(msg->event.fde);
+       talloc_set_destructor(state, imessaging_post_state_destructor);
 
-       if (auto_remove) {
-               talloc_set_destructor(msg, imessaging_cleanup);
+       ti = tevent_create_immediate(state);
+       if (ti == NULL) {
+               TALLOC_FREE(state);
+               return ENOMEM;
        }
-       
-       imessaging_register(msg, NULL, MSG_PING, ping_message);
-       imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
-       IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
 
-       return msg;
+       state->msg_ctx = msg;
+       state->busy_ref = NULL;
+       state->buf_len = buf_len;
+       memcpy(state->buf, buf, buf_len);
+
+       tevent_schedule_immediate(ti, msg->ev, imessaging_post_handler,
+                                 state);
+
+       return 0;
+}
+
+static void imessaging_dgm_recv(struct tevent_context *ev,
+                               const uint8_t *buf, size_t buf_len,
+                               int *fds, size_t num_fds,
+                               void *private_data)
+{
+       struct imessaging_context *msg = talloc_get_type_abort(
+               private_data, struct imessaging_context);
+       uint32_t msg_type;
+       struct server_id src, dst;
+       struct server_id_buf srcbuf, dstbuf;
+       DATA_BLOB data;
+
+       if (buf_len < MESSAGE_HDR_LENGTH) {
+               /* Invalid message, ignore */
+               return;
+       }
+
+       if (num_fds != 0) {
+               /*
+                * Source4 based messaging does not expect fd's yet
+                */
+               return;
+       }
+
+       if (ev != msg->ev) {
+               int ret;
+               ret = imessaging_post_self(msg, buf, buf_len);
+               if (ret != 0) {
+                       DBG_WARNING("imessaging_post_self failed: %s\n",
+                                   strerror(ret));
+               }
+               return;
+       }
+
+       message_hdr_get(&msg_type, &src, &dst, buf);
+
+       data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH);
+       data.length = buf_len - MESSAGE_HDR_LENGTH;
+
+       if ((cluster_id_equal(&dst, &msg->server_id)) ||
+           ((dst.task_id == 0) && (msg->server_id.pid == 0))) {
+               struct dispatch_fn *d, *next;
+
+               DEBUG(10, ("%s: dst %s matches my id: %s, type=0x%x\n",
+                          __func__,
+                          server_id_str_buf(dst, &dstbuf),
+                          server_id_str_buf(msg->server_id, &srcbuf),
+                          (unsigned)msg_type));
+
+               d = imessaging_find_dispatch(msg, msg_type);
+
+               for (; d; d = next) {
+                       next = d->next;
+                       d->fn(msg, d->private_data, d->msg_type, src, &data);
+               }
+       } else {
+               DEBUG(10, ("%s: Ignoring type=0x%x dst %s, I am %s, \n",
+                          __func__, (unsigned)msg_type,
+                          server_id_str_buf(dst, &dstbuf),
+                          server_id_str_buf(msg->server_id, &srcbuf)));
+       }
+}
+
+struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
+                                          struct loadparm_context *lp_ctx,
+                                          struct server_id server_id,
+                                          struct tevent_context *ev)
+{
+       if (ev == NULL) {
+               return NULL;
+       }
+
+       if (tevent_context_is_wrapper(ev)) {
+               /*
+                * This is really a programmer error!
+                *
+                * The main/raw tevent context should
+                * have been registered first!
+                */
+               DBG_ERR("Should not be used with a wrapper tevent context\n");
+               errno = EINVAL;
+               return NULL;
+       }
+
+       return imessaging_init_internal(mem_ctx, lp_ctx, server_id, ev);
 }
 
-/* 
-   A hack, for the short term until we get 'client only' messaging in place 
+/*
+   A hack, for the short term until we get 'client only' messaging in place
 */
 struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
                                                  struct loadparm_context *lp_ctx,
@@ -670,7 +655,7 @@ struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
        /* This is because we are not in the s3 serverid database */
        id.unique_id = SERVERID_UNIQUE_ID_NOT_TO_VERIFY;
 
-       return imessaging_init(mem_ctx, lp_ctx, id, ev, true);
+       return imessaging_init_internal(mem_ctx, lp_ctx, id, ev);
 }
 /*
   a list of registered irpc server functions
@@ -689,7 +674,7 @@ struct irpc_list {
   register a irpc server function
 */
 NTSTATUS irpc_register(struct imessaging_context *msg_ctx,
-                      const struct ndr_interface_table *table, 
+                      const struct ndr_interface_table *table,
                       int callnum, irpc_function_t fn, void *private_data)
 {
        struct irpc_list *irpc;
@@ -814,7 +799,6 @@ static void irpc_handler_request(struct imessaging_context *msg_ctx,
        m->msg_ctx     = msg_ctx;
        m->irpc        = i;
        m->data        = r;
-       m->ev          = msg_ctx->event.ev;
 
        m->header.status = i->fn(m, r);
 
@@ -884,112 +868,31 @@ static int irpc_destructor(struct irpc_request *irpc)
        return 0;
 }
 
-/*
-  open the naming database
-*/
-static struct tdb_wrap *irpc_namedb_open(struct imessaging_context *msg_ctx)
-{
-       struct tdb_wrap *t;
-       char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
-       if (path == NULL) {
-               return NULL;
-       }
-       t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660, msg_ctx->lp_ctx);
-       talloc_free(path);
-       return t;
-}
-       
-
 /*
   add a string name that this irpc server can be called on
-*/
-NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
-{
-       struct tdb_wrap *t;
-       TDB_DATA rec;
-       int count;
-       NTSTATUS status = NT_STATUS_OK;
-
-       t = irpc_namedb_open(msg_ctx);
-       NT_STATUS_HAVE_NO_MEMORY(t);
-
-       if (tdb_lock_bystring(t->tdb, name) != 0) {
-               talloc_free(t);
-               return NT_STATUS_LOCK_NOT_GRANTED;
-       }
-       rec = tdb_fetch_bystring(t->tdb, name);
-       count = rec.dsize / sizeof(struct server_id);
-       rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
-       rec.dsize += sizeof(struct server_id);
-       if (rec.dptr == NULL) {
-               tdb_unlock_bystring(t->tdb, name);
-               talloc_free(t);
-               return NT_STATUS_NO_MEMORY;
-       }
-       ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
-       if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
-               status = NT_STATUS_INTERNAL_ERROR;
-       }
-       free(rec.dptr);
-       tdb_unlock_bystring(t->tdb, name);
-       talloc_free(t);
 
-       msg_ctx->names = str_list_add(msg_ctx->names, name);
-       talloc_steal(msg_ctx, msg_ctx->names);
-
-       return status;
-}
-
-/*
-  return a list of server ids for a server name
+  It will be removed from the DB either via irpc_remove_name or on
+  talloc_free(msg_ctx->names).
 */
-struct server_id *irpc_servers_byname(struct imessaging_context *msg_ctx,
-                                     TALLOC_CTX *mem_ctx,
-                                     const char *name)
+NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
 {
-       struct tdb_wrap *t;
-       TDB_DATA rec;
-       int count, i;
-       struct server_id *ret;
-
-       t = irpc_namedb_open(msg_ctx);
-       if (t == NULL) {
-               return NULL;
-       }
+       int ret;
 
-       if (tdb_lock_bystring(t->tdb, name) != 0) {
-               talloc_free(t);
-               return NULL;
-       }
-       rec = tdb_fetch_bystring(t->tdb, name);
-       if (rec.dptr == NULL) {
-               tdb_unlock_bystring(t->tdb, name);
-               talloc_free(t);
-               return NULL;
-       }
-       count = rec.dsize / sizeof(struct server_id);
-       ret = talloc_array(mem_ctx, struct server_id, count+1);
-       if (ret == NULL) {
-               tdb_unlock_bystring(t->tdb, name);
-               talloc_free(t);
-               return NULL;
+       ret = server_id_db_add(msg_ctx->names, name);
+       if (ret != 0) {
+               return map_nt_error_from_unix_common(ret);
        }
-       for (i=0;i<count;i++) {
-               ret[i] = ((struct server_id *)rec.dptr)[i];
-       }
-       ret[i] = cluster_id(0, 0);
-       free(rec.dptr);
-       tdb_unlock_bystring(t->tdb, name);
-       talloc_free(t);
-
-       return ret;
+       return NT_STATUS_OK;
 }
 
-static int all_servers_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+static int all_servers_func(const char *name, unsigned num_servers,
+                           const struct server_id *servers,
+                           void *private_data)
 {
-       struct irpc_name_records *name_records = talloc_get_type(state, struct irpc_name_records);
+       struct irpc_name_records *name_records = talloc_get_type(
+               private_data, struct irpc_name_records);
        struct irpc_name_record *name_record;
-       int i;
+       uint32_t i;
 
        name_records->names
                = talloc_realloc(name_records, name_records->names,
@@ -1007,22 +910,19 @@ static int all_servers_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data
 
        name_records->num_records++;
 
-       name_record->name
-               = talloc_strndup(name_record,
-                                (const char *)key.dptr, key.dsize);
+       name_record->name = talloc_strdup(name_record, name);
        if (!name_record->name) {
                return -1;
        }
 
-       name_record->count = data.dsize / sizeof(struct server_id);
-       name_record->ids = talloc_array(name_record,
-                                       struct server_id,
-                                       name_record->count);
+       name_record->count = num_servers;
+       name_record->ids = talloc_array(name_record, struct server_id,
+                                       num_servers);
        if (name_record->ids == NULL) {
                return -1;
        }
        for (i=0;i<name_record->count;i++) {
-               name_record->ids[i] = ((struct server_id *)data.dptr)[i];
+               name_record->ids[i] = servers[i];
        }
        return 0;
 }
@@ -1033,26 +933,19 @@ static int all_servers_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data
 struct irpc_name_records *irpc_all_servers(struct imessaging_context *msg_ctx,
                                           TALLOC_CTX *mem_ctx)
 {
-       struct tdb_wrap *t;
        int ret;
        struct irpc_name_records *name_records = talloc_zero(mem_ctx, struct irpc_name_records);
        if (name_records == NULL) {
                return NULL;
        }
 
-       t = irpc_namedb_open(msg_ctx);
-       if (t == NULL) {
-               return NULL;
-       }
-
-       ret = tdb_traverse_read(t->tdb, all_servers_func, name_records);
+       ret = server_id_db_traverse_read(msg_ctx->names, all_servers_func,
+                                        name_records);
        if (ret == -1) {
-               talloc_free(t);
+               TALLOC_FREE(name_records);
                return NULL;
        }
 
-       talloc_free(t);
-
        return name_records;
 }
 
@@ -1061,50 +954,7 @@ struct irpc_name_records *irpc_all_servers(struct imessaging_context *msg_ctx,
 */
 void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name)
 {
-       struct tdb_wrap *t;
-       TDB_DATA rec;
-       int count, i;
-       struct server_id *ids;
-
-       str_list_remove(msg_ctx->names, name);
-
-       t = irpc_namedb_open(msg_ctx);
-       if (t == NULL) {
-               return;
-       }
-
-       if (tdb_lock_bystring(t->tdb, name) != 0) {
-               talloc_free(t);
-               return;
-       }
-       rec = tdb_fetch_bystring(t->tdb, name);
-       if (rec.dptr == NULL) {
-               tdb_unlock_bystring(t->tdb, name);
-               talloc_free(t);
-               return;
-       }
-       count = rec.dsize / sizeof(struct server_id);
-       if (count == 0) {
-               free(rec.dptr);
-               tdb_unlock_bystring(t->tdb, name);
-               talloc_free(t);
-               return;
-       }
-       ids = (struct server_id *)rec.dptr;
-       for (i=0;i<count;i++) {
-               if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
-                       if (i < count-1) {
-                               memmove(ids+i, ids+i+1, 
-                                       sizeof(struct server_id) * (count-(i+1)));
-                       }
-                       rec.dsize -= sizeof(struct server_id);
-                       break;
-               }
-       }
-       tdb_store_bystring(t->tdb, name, rec, 0);
-       free(rec.dptr);
-       tdb_unlock_bystring(t->tdb, name);
-       talloc_free(t);
+       server_id_db_remove(msg_ctx->names, name);
 }
 
 struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx)
@@ -1372,9 +1222,9 @@ static const struct dcerpc_binding_handle_ops irpc_bh_ops = {
 
 /* initialise a irpc binding handle */
 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx,
-                                       struct imessaging_context *msg_ctx,
-                                       struct server_id server_id,
-                                       const struct ndr_interface_table *table)
+                                                 struct imessaging_context *msg_ctx,
+                                                 struct server_id server_id,
+                                                 const struct ndr_interface_table *table)
 {
        struct dcerpc_binding_handle *h;
        struct irpc_bh_state *hs;
@@ -1394,28 +1244,25 @@ struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx,
        hs->table = table;
        hs->timeout = IRPC_CALL_TIMEOUT;
 
-       dcerpc_binding_handle_set_sync_ev(h, msg_ctx->event.ev);
-
        return h;
 }
 
 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx,
-                                       struct imessaging_context *msg_ctx,
-                                       const char *dest_task,
-                                       const struct ndr_interface_table *table)
+                                                         struct imessaging_context *msg_ctx,
+                                                         const char *dest_task,
+                                                         const struct ndr_interface_table *table)
 {
        struct dcerpc_binding_handle *h;
+       unsigned num_sids;
        struct server_id *sids;
        struct server_id sid;
+       NTSTATUS status;
 
        /* find the server task */
-       sids = irpc_servers_byname(msg_ctx, mem_ctx, dest_task);
-       if (sids == NULL) {
-               errno = EADDRNOTAVAIL;
-               return NULL;
-       }
-       if (sids[0].pid == 0) {
-               talloc_free(sids);
+
+       status = irpc_servers_byname(msg_ctx, mem_ctx, dest_task,
+                                    &num_sids, &sids);
+       if (!NT_STATUS_IS_OK(status)) {
                errno = EADDRNOTAVAIL;
                return NULL;
        }