Fix a few more breakages from our recent changes to the server_id
[samba.git] / source4 / lib / messaging / messaging.c
index 041554a7c04a52a68add8745e89164242427a84a..6a879ab962774f8008af09b56aea34b949042b37 100644 (file)
@@ -7,7 +7,7 @@
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
-#include "events.h"
-#include "system/time.h"
-#include "messages.h"
-#include "dlinklist.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "messaging/messaging.h"
+#include "lib/util/dlinklist.h"
+#include "lib/socket/socket.h"
+#include "librpc/gen_ndr/ndr_irpc.h"
+#include "lib/messaging/irpc.h"
+#include "tdb_wrap.h"
+#include "lib/util/unix_privs.h"
+#include "librpc/rpc/dcerpc.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/util/util_tdb.h"
+#include "lib/util/util_tdb.h"
+#include "cluster/cluster.h"
+#include "param/param.h"
 
 /* change the message version with any incompatible changes in the protocol */
 #define MESSAGING_VERSION 1
 
-/* the number of microseconds to backoff in retrying to send a message */
-#define MESSAGING_BACKOFF 250000
-
 struct messaging_context {
-       servid_t server_id;
+       struct server_id server_id;
        struct socket_context *sock;
-       char *path;
-       struct dispatch_fn *dispatch;
-
+       const char *base_path;
+       const char *path;
+       struct dispatch_fn **dispatch;
+       uint32_t num_types;
+       struct idr_context *dispatch_tree;
+       struct messaging_rec *pending;
+       struct messaging_rec *retry_queue;
+       struct smb_iconv_convenience *iconv_convenience;
+       struct irpc_list *irpc;
+       struct idr_context *idr;
+       const char **names;
+       struct timeval start_time;
+       struct timed_event *retry_te;
        struct {
                struct event_context *ev;
                struct fd_event *fde;
        } event;
 };
 
-/* we have a linked list of dispatch handlers that this messaging
-   server can deal with */
+/* we have a linked list of dispatch handlers for each msg_type that
+   this messaging server can deal with */
 struct dispatch_fn {
        struct dispatch_fn *next, *prev;
        uint32_t msg_type;
        void *private;
-       void (*fn)(struct messaging_context *msg, void *private, 
-                  uint32_t msg_type, servid_t server_id, DATA_BLOB *data);
+       msg_callback_t fn;
 };
 
 /* an individual message */
 struct messaging_rec {
+       struct messaging_rec *next, *prev;
        struct messaging_context *msg;
-       struct socket_context *sock;
-       struct fd_event *fde;
        const char *path;
 
-       struct {
+       struct messaging_header {
                uint32_t version;
                uint32_t msg_type;
-               servid_t from;
-               servid_t to;
+               struct server_id from;
+               struct server_id to;
                uint32_t length;
-       } header;
-
-       DATA_BLOB data;
+       } *header;
 
-       uint32_t ndone;
+       DATA_BLOB packet;
+       uint32_t retries;
 };
 
+
+static void irpc_handler(struct messaging_context *, void *, 
+                        uint32_t, struct server_id, DATA_BLOB *);
+
+
 /*
  A useful function for testing the message system.
 */
 static void ping_message(struct messaging_context *msg, void *private, 
-                        uint32_t msg_type, servid_t src, DATA_BLOB *data)
+                        uint32_t msg_type, struct server_id src, DATA_BLOB *data)
 {
-       DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n",
-                (uint_t)src, data->length, data->data?(const char *)data->data:""));
+       DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
+                (uint_t)src.node, (uint_t)src.id, (int)data->length, 
+                data->data?(const char *)data->data:""));
        messaging_send(msg, src, MSG_PONG, data);
 }
 
+/*
+  return uptime of messaging server via irpc
+*/
+static NTSTATUS irpc_uptime(struct irpc_message *msg, 
+                           struct irpc_uptime *r)
+{
+       struct messaging_context *ctx = talloc_get_type(msg->private, struct messaging_context);
+       *r->out.start_time = timeval_to_nttime(&ctx->start_time);
+       return NT_STATUS_OK;
+}
+
 /* 
    return the path to a messaging socket
 */
-static char *messaging_path(TALLOC_CTX *mem_ctx, servid_t server_id)
+static char *messaging_path(struct messaging_context *msg, struct server_id server_id)
 {
-       char *name = talloc_asprintf(mem_ctx, "messaging/msg.%u", (unsigned)server_id);
-       char *ret;
-       ret = smbd_tmp_path(mem_ctx, name);
-       talloc_free(name);
-       return ret;
+       return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, 
+                              cluster_id_string(msg, server_id));
 }
 
 /*
   dispatch a fully received message
+
+  note that this deliberately can match more than one message handler
+  per message. That allows a single messasging context to register
+  (for example) a debug handler for more than one piece of code
 */
 static void messaging_dispatch(struct messaging_context *msg, struct messaging_rec *rec)
 {
        struct dispatch_fn *d, *next;
-       for (d=msg->dispatch;d;d=next) {
-               next = d->next;
-               if (d->msg_type == rec->header.msg_type) {
-                       d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
-               }
+
+       /* temporary IDs use an idtree, the rest use a array of pointers */
+       if (rec->header->msg_type >= MSG_TMP_BASE) {
+               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
+                                                  rec->header->msg_type);
+       } else if (rec->header->msg_type < msg->num_types) {
+               d = msg->dispatch[rec->header->msg_type];
+       } else {
+               d = NULL;
        }
 
-       /* we don't free the record itself here as there may
-          be more messages from this client */
-       data_blob_free(&rec->data);
-       rec->header.length = 0;
-       rec->ndone = 0;
+       for (; d; d = next) {
+               DATA_BLOB data;
+               next = d->next;
+               data.data = rec->packet.data + sizeof(*rec->header);
+               data.length = rec->header->length;
+               d->fn(msg, d->private, d->msg_type, rec->header->from, &data);
+       }
+       rec->header->length = 0;
 }
 
-
 /*
-  handle IO for a single message
+  handler for messages that arrive from other nodes in the cluster
 */
-static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde, 
-                                  struct timeval t, uint16_t flags)
+static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet)
 {
-       struct messaging_rec *rec = fde->private;
-       struct messaging_context *msg = rec->msg;
-       NTSTATUS status;
-
-       if (rec->ndone < sizeof(rec->header)) {
-               /* receive the header */
-               size_t nread;
-
-               status = socket_recv(rec->sock, 
-                                    rec->ndone + (char *)&rec->header,
-                                    sizeof(rec->header) - rec->ndone, &nread, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
-               }
+       struct messaging_rec *rec;
 
-               if (nread == 0) {
-                       return;
-               }
+       rec = talloc(msg, struct messaging_rec);
+       if (rec == NULL) {
+               smb_panic("Unable to allocate messaging_rec");
+       }
 
-               rec->ndone += nread;
+       rec->msg           = msg;
+       rec->path          = msg->path;
+       rec->header        = (struct messaging_header *)packet.data;
+       rec->packet        = packet;
+       rec->retries       = 0;
 
-               if (rec->ndone == sizeof(rec->header)) {
-                       if (rec->header.version != MESSAGING_VERSION) {
-                               DEBUG(0,("meessage with wrong version %u\n",
-                                        rec->header.version));
-                               talloc_free(rec);
-                       }
-                       rec->data = data_blob_talloc(rec, NULL, rec->header.length);
-                       if (rec->data.length != rec->header.length) {
-                               DEBUG(0,("Unable to allocate message of size %u\n",
-                                        rec->header.length));
-                               talloc_free(rec);
-                       }
-               }
+       if (packet.length != sizeof(*rec->header) + rec->header->length) {
+               DEBUG(0,("messaging: bad message header size %d should be %d\n", 
+                        rec->header->length, (int)(packet.length - sizeof(*rec->header))));
+               talloc_free(rec);
+               return;
        }
 
-       if (rec->ndone >= sizeof(rec->header) && 
-           rec->ndone < sizeof(rec->header) + rec->header.length) {
-               /* receive the body, if any */
-               size_t nread;
+       messaging_dispatch(msg, rec);
+       talloc_free(rec);
+}
 
-               status = socket_recv(rec->sock, 
-                                    rec->data.data + (rec->ndone - sizeof(rec->header)),
-                                    sizeof(rec->header) + rec->header.length - rec->ndone, 
-                                    &nread, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
-               }
 
-               if (nread == 0) {
-                       return;
-               }
 
-               rec->ndone += nread;
+/*
+  try to send the message
+*/
+static NTSTATUS try_send(struct messaging_rec *rec)
+{
+       struct messaging_context *msg = rec->msg;
+       size_t nsent;
+       void *priv;
+       NTSTATUS status;
+       struct socket_address *path;
+
+       /* rec->path is the path of the *other* socket, where we want
+        * this to end up */
+       path = socket_address_from_strings(msg, msg->sock->backend_name, 
+                                          rec->path, 0);
+       if (!path) {
+               return NT_STATUS_NO_MEMORY;
        }
 
-       if (rec->ndone == sizeof(rec->header) + rec->header.length) {
-               /* we've got the whole message */
-               messaging_dispatch(msg, rec);
+       /* we send with privileges so messages work from any context */
+       priv = root_privileges();
+       status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
+       talloc_free(path);
+       talloc_free(priv);
+
+       return status;
+}
+
+/*
+  retry backed off messages
+*/
+static void msg_retry_timer(struct event_context *ev, struct timed_event *te, 
+                           struct timeval t, void *private)
+{
+       struct messaging_context *msg = talloc_get_type(private, 
+                                                       struct messaging_context);
+       msg->retry_te = NULL;
+
+       /* put the messages back on the main queue */
+       while (msg->retry_queue) {
+               struct messaging_rec *rec = msg->retry_queue;
+               DLIST_REMOVE(msg->retry_queue, rec);
+               DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
        }
+
+       EVENT_FD_WRITEABLE(msg->event.fde);     
 }
 
 /*
-  destroy a messaging record
+  handle a socket write event
 */
-static int rec_destructor(void *ptr)
+static void messaging_send_handler(struct messaging_context *msg)
 {
-       struct messaging_rec *rec = ptr;
-       struct messaging_context *msg = rec->msg;
-       event_remove_fd(msg->event.ev, rec->fde);
-       return 0;
+       while (msg->pending) {
+               struct messaging_rec *rec = msg->pending;
+               NTSTATUS status;
+               status = try_send(rec);
+               if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+                       rec->retries++;
+                       if (rec->retries > 3) {
+                               /* we're getting continuous write errors -
+                                  backoff this record */
+                               DLIST_REMOVE(msg->pending, rec);
+                               DLIST_ADD_END(msg->retry_queue, rec, 
+                                             struct messaging_rec *);
+                               if (msg->retry_te == NULL) {
+                                       msg->retry_te = 
+                                               event_add_timed(msg->event.ev, msg, 
+                                                               timeval_current_ofs(1, 0), 
+                                                               msg_retry_timer, msg);
+                               }
+                       }
+                       break;
+               }
+               rec->retries = 0;
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 
+                                cluster_id_string(debug_ctx(), rec->header->from), 
+                                cluster_id_string(debug_ctx(), rec->header->to), 
+                                rec->header->msg_type, 
+                                nt_errstr(status)));
+               }
+               DLIST_REMOVE(msg->pending, rec);
+               talloc_free(rec);
+       }
+       if (msg->pending == NULL) {
+               EVENT_FD_NOT_WRITEABLE(msg->event.fde);
+       }
 }
 
 /*
-  handle a new incoming connection
+  handle a new incoming packet
 */
-static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde, 
-                                    struct timeval t, uint16_t flags)
+static void messaging_recv_handler(struct messaging_context *msg)
 {
-       struct messaging_context *msg = fde->private;
        struct messaging_rec *rec;
        NTSTATUS status;
-       struct fd_event fde2;
+       DATA_BLOB packet;
+       size_t msize;
 
-       rec = talloc_p(msg, struct messaging_rec);
+       /* see how many bytes are in the next packet */
+       status = socket_pending(msg->sock, &msize);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(0,("socket_pending failed in messaging - %s\n", 
+                        nt_errstr(status)));
+               return;
+       }
+       
+       packet = data_blob_talloc(msg, NULL, msize);
+       if (packet.data == NULL) {
+               /* assume this is temporary and retry */
+               return;
+       }
+           
+       status = socket_recv(msg->sock, packet.data, msize, &msize);
+       if (!NT_STATUS_IS_OK(status)) {
+               data_blob_free(&packet);
+               return;
+       }
+
+       if (msize < sizeof(*rec->header)) {
+               DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
+               data_blob_free(&packet);
+               return;
+       }
+
+       rec = talloc(msg, struct messaging_rec);
        if (rec == NULL) {
                smb_panic("Unable to allocate messaging_rec");
        }
 
-       status = socket_accept(msg->sock, &rec->sock);
-       if (!NT_STATUS_IS_OK(status)) {
-               smb_panic("Unable to accept messaging_rec");
-       }
-       talloc_steal(rec, rec->sock);
+       talloc_steal(rec, packet.data);
+       rec->msg           = msg;
+       rec->path          = msg->path;
+       rec->header        = (struct messaging_header *)packet.data;
+       rec->packet        = packet;
+       rec->retries       = 0;
 
-       rec->msg = msg;
-       rec->ndone = 0;
-       rec->header.length = 0;
-       rec->path = msg->path;
+       if (msize != sizeof(*rec->header) + rec->header->length) {
+               DEBUG(0,("messaging: bad message header size %d should be %d\n", 
+                        rec->header->length, (int)(msize - sizeof(*rec->header))));
+               talloc_free(rec);
+               return;
+       }
 
-       fde2.private    = rec;
-       fde2.fd         = socket_get_fd(rec->sock);
-       fde2.flags      = EVENT_FD_READ;
-       fde2.handler    = messaging_recv_handler;
+       messaging_dispatch(msg, rec);
+       talloc_free(rec);
+}
 
-       rec->fde        = event_add_fd(msg->event.ev, &fde2);
 
-       talloc_set_destructor(rec, rec_destructor);
+/*
+  handle a socket event
+*/
+static void messaging_handler(struct event_context *ev, struct fd_event *fde, 
+                             uint16_t flags, void *private)
+{
+       struct messaging_context *msg = talloc_get_type(private, 
+                                                       struct messaging_context);
+       if (flags & EVENT_FD_WRITE) {
+               messaging_send_handler(msg);
+       }
+       if (flags & EVENT_FD_READ) {
+               messaging_recv_handler(msg);
+       }
 }
 
+
 /*
   Register a dispatch function for a particular message type.
 */
-void messaging_register(struct messaging_context *msg, void *private,
-                       uint32_t msg_type, 
-                       void (*fn)(struct messaging_context *, void *, uint32_t, servid_t, DATA_BLOB *))
+NTSTATUS messaging_register(struct messaging_context *msg, void *private,
+                           uint32_t msg_type, msg_callback_t fn)
 {
        struct dispatch_fn *d;
 
-       d = talloc_p(msg, struct dispatch_fn);
+       /* possibly expand dispatch array */
+       if (msg_type >= msg->num_types) {
+               struct dispatch_fn **dp;
+               int i;
+               dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
+               NT_STATUS_HAVE_NO_MEMORY(dp);
+               msg->dispatch = dp;
+               for (i=msg->num_types;i<=msg_type;i++) {
+                       msg->dispatch[i] = NULL;
+               }
+               msg->num_types = msg_type+1;
+       }
+
+       d = talloc_zero(msg->dispatch, struct dispatch_fn);
+       NT_STATUS_HAVE_NO_MEMORY(d);
        d->msg_type = msg_type;
        d->private = private;
        d->fn = fn;
-       DLIST_ADD(msg->dispatch, d);
+
+       DLIST_ADD(msg->dispatch[msg_type], d);
+
+       return NT_STATUS_OK;
+}
+
+/*
+  register a temporary message handler. The msg_type is allocated
+  above MSG_TMP_BASE
+*/
+NTSTATUS messaging_register_tmp(struct messaging_context *msg, void *private,
+                               msg_callback_t fn, uint32_t *msg_type)
+{
+       struct dispatch_fn *d;
+       int id;
+
+       d = talloc_zero(msg->dispatch, struct dispatch_fn);
+       NT_STATUS_HAVE_NO_MEMORY(d);
+       d->private = private;
+       d->fn = fn;
+
+       id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
+       if (id == -1) {
+               talloc_free(d);
+               return NT_STATUS_TOO_MANY_CONTEXT_IDS;
+       }
+
+       d->msg_type = (uint32_t)id;
+       (*msg_type) = d->msg_type;
+
+       return NT_STATUS_OK;
 }
 
 /*
@@ -259,269 +420,688 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void
 {
        struct dispatch_fn *d, *next;
 
-       for (d = msg->dispatch; d; d = next) {
+       if (msg_type >= msg->num_types) {
+               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
+                                                  msg_type);
+               if (!d) return;
+               idr_remove(msg->dispatch_tree, msg_type);
+               talloc_free(d);
+               return;
+       }
+
+       for (d = msg->dispatch[msg_type]; d; d = next) {
                next = d->next;
-               if (d->msg_type == msg_type && 
-                   d->private == private) {
-                       DLIST_REMOVE(msg->dispatch, d);
+               if (d->private == private) {
+                       DLIST_REMOVE(msg->dispatch[msg_type], d);
                        talloc_free(d);
                }
-       }       
+       }
 }
 
-
-
 /*
-  handle IO for sending a message
+  Send a message to a particular server
 */
-static void messaging_send_handler(struct event_context *ev, struct fd_event *fde
-                                  struct timeval t, uint16_t flags)
+NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server
+                       uint32_t msg_type, DATA_BLOB *data)
 {
-       struct messaging_rec *rec = fde->private;
+       struct messaging_rec *rec;
        NTSTATUS status;
+       size_t dlength = data?data->length:0;
 
-       if (rec->ndone < sizeof(rec->header)) {
-               /* send the header */
-               size_t nsent;
-               DATA_BLOB blob;
-
-               blob.data = rec->ndone + (char *)&rec->header;
-               blob.length = sizeof(rec->header) - rec->ndone;
+       rec = talloc(msg, struct messaging_rec);
+       if (rec == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
 
-               status = socket_send(rec->sock, &blob, &nsent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
-               }
+       rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
+       if (rec->packet.data == NULL) {
+               talloc_free(rec);
+               return NT_STATUS_NO_MEMORY;
+       }
 
-               if (nsent == 0) {
-                       return;
-               }
+       rec->retries       = 0;
+       rec->msg              = msg;
+       rec->header           = (struct messaging_header *)rec->packet.data;
+       rec->header->version  = MESSAGING_VERSION;
+       rec->header->msg_type = msg_type;
+       rec->header->from     = msg->server_id;
+       rec->header->to       = server;
+       rec->header->length   = dlength;
+       if (dlength != 0) {
+               memcpy(rec->packet.data + sizeof(*rec->header), 
+                      data->data, dlength);
+       }
 
-               rec->ndone += nsent;
+       if (!cluster_node_equal(&msg->server_id, &server)) {
+               /* the destination is on another node - dispatch via
+                  the cluster layer */
+               status = cluster_message_send(server, &rec->packet);
+               talloc_free(rec);
+               return status;
        }
 
-       if (rec->ndone >= sizeof(rec->header) && 
-           rec->ndone < sizeof(rec->header) + rec->header.length) {
-               /* send the body, if any */
-               DATA_BLOB blob;
-               size_t nsent;
+       rec->path = messaging_path(msg, server);
+       talloc_steal(rec, rec->path);
 
-               blob.data = rec->data.data + (rec->ndone - sizeof(rec->header));
-               blob.length = rec->header.length - (rec->ndone - sizeof(rec->header));
+       if (msg->pending != NULL) {
+               status = STATUS_MORE_ENTRIES;
+       } else {
+               status = try_send(rec);
+       }
 
-               status = socket_send(rec->sock, &blob, &nsent, 0);
-               if (NT_STATUS_IS_ERR(status)) {
-                       talloc_free(rec);
-                       return;
+       if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+               if (msg->pending == NULL) {
+                       EVENT_FD_WRITEABLE(msg->event.fde);
                }
-
-               rec->ndone += nsent;
+               DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
+               return NT_STATUS_OK;
        }
 
-       if (rec->ndone == sizeof(rec->header) + rec->header.length) {
-               /* we've done the whole message */
-               talloc_free(rec);
-       }
-}
+       talloc_free(rec);
 
+       return status;
+}
 
 /*
-  wrapper around socket_connect with raised privileges
+  Send a message to a particular server, with the message containing a single pointer
 */
-static NTSTATUS try_connect(struct messaging_rec *rec)
+NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, 
+                           uint32_t msg_type, void *ptr)
 {
-       NTSTATUS status;
-       void *priv = root_privileges();
-       status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
-       talloc_free(priv);
-       return status;
+       DATA_BLOB blob;
+
+       blob.data = (uint8_t *)&ptr;
+       blob.length = sizeof(void *);
+
+       return messaging_send(msg, server, msg_type, &blob);
 }
 
 
 /*
-  when the servers listen queue is full we use this to backoff the message
+  destroy the messaging context
 */
-static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te, 
-                                     struct timeval t)
+static int messaging_destructor(struct messaging_context *msg)
 {
-       struct messaging_rec *rec = te->private;
-       struct messaging_context *msg = rec->msg;
+       unlink(msg->path);
+       while (msg->names && msg->names[0]) {
+               irpc_remove_name(msg, msg->names[0]);
+       }
+       return 0;
+}
+
+/*
+  create the listening socket and setup the dispatcher
+*/
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
+                                        const char *dir,
+                                        struct server_id server_id, 
+                                        struct smb_iconv_convenience *iconv_convenience,
+                                        struct event_context *ev)
+{
+       struct messaging_context *msg;
        NTSTATUS status;
-       struct fd_event fde;
+       struct socket_address *path;
 
-       status = try_connect(rec);
-       if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
-               /* backoff again */
-               te->next_event = timeval_add(&t, 0, MESSAGING_BACKOFF);
-               return;
+       msg = talloc_zero(mem_ctx, struct messaging_context);
+       if (msg == NULL) {
+               return NULL;
        }
 
+       /* setup a handler for messages from other cluster nodes, if appropriate */
+       status = cluster_message_init(msg, server_id, cluster_message_handler);
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n", 
-                        rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status)));
-               talloc_free(rec);
-               return;
+               talloc_free(msg);
+               return NULL;
+       }
+
+       if (ev == NULL) {
+               ev = event_context_init(msg);
+       }
+
+       /* create the messaging directory if needed */
+       mkdir(dir, 0700);
+
+       msg->base_path     = talloc_reference(msg, dir);
+       msg->path          = messaging_path(msg, server_id);
+       msg->server_id     = server_id;
+       msg->iconv_convenience = iconv_convenience;
+       msg->idr           = idr_init(msg);
+       msg->dispatch_tree = idr_init(msg);
+       msg->start_time    = timeval_current();
+
+       status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               talloc_free(msg);
+               return NULL;
        }
 
-       fde.private     = rec;
-       fde.fd          = socket_get_fd(rec->sock);
-       fde.flags       = EVENT_FD_WRITE;
-       fde.handler     = messaging_send_handler;
+       /* by stealing here we ensure that the socket is cleaned up (and even 
+          deleted) on exit */
+       talloc_steal(msg, msg->sock);
 
-       rec->fde        = event_add_fd(msg->event.ev, &fde);
+       path = socket_address_from_strings(msg, msg->sock->backend_name, 
+                                          msg->path, 0);
+       if (!path) {
+               talloc_free(msg);
+               return NULL;
+       }
 
-       talloc_set_destructor(rec, rec_destructor);
+       status = socket_listen(msg->sock, path, 50, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
+               talloc_free(msg);
+               return NULL;
+       }
 
-       messaging_send_handler(msg->event.ev, rec->fde, timeval_zero(), EVENT_FD_WRITE);
+       /* it needs to be non blocking for sends */
+       set_blocking(socket_get_fd(msg->sock), false);
+
+       msg->event.ev   = talloc_reference(msg, ev);
+       msg->event.fde  = event_add_fd(ev, msg, socket_get_fd(msg->sock), 
+                                      EVENT_FD_READ, messaging_handler, msg);
+
+       talloc_set_destructor(msg, messaging_destructor);
+       
+       messaging_register(msg, NULL, MSG_PING, ping_message);
+       messaging_register(msg, NULL, MSG_IRPC, irpc_handler);
+       IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
+
+       return msg;
 }
 
+/* 
+   A hack, for the short term until we get 'client only' messaging in place 
+*/
+struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 
+                                               const char *dir,
+                                               struct smb_iconv_convenience *iconv_convenience,
+                                               struct event_context *ev)
+{
+       struct server_id id;
+       ZERO_STRUCT(id);
+       id.id = random() % 0x10000000;
+       return messaging_init(mem_ctx, dir, id, iconv_convenience, ev);
+}
+/*
+  a list of registered irpc server functions
+*/
+struct irpc_list {
+       struct irpc_list *next, *prev;
+       struct GUID uuid;
+       const struct ndr_interface_table *table;
+       int callnum;
+       irpc_function_t fn;
+       void *private;
+};
+
 
 /*
-  Send a message to a particular server
+  register a irpc server function
 */
-NTSTATUS messaging_send(struct messaging_context *msg, servid_t server, uint32_t msg_type, DATA_BLOB *data)
+NTSTATUS irpc_register(struct messaging_context *msg_ctx, 
+                      const struct ndr_interface_table *table, 
+                      int callnum, irpc_function_t fn, void *private)
 {
-       struct messaging_rec *rec;
-       NTSTATUS status;
-       struct fd_event fde;
+       struct irpc_list *irpc;
 
-       rec = talloc_p(msg, struct messaging_rec);
-       if (rec == NULL) {
-               return NT_STATUS_NO_MEMORY;
+       /* override an existing handler, if any */
+       for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) {
+               if (irpc->table == table && irpc->callnum == callnum) {
+                       break;
+               }
        }
+       if (irpc == NULL) {
+               irpc = talloc(msg_ctx, struct irpc_list);
+               NT_STATUS_HAVE_NO_MEMORY(irpc);
+               DLIST_ADD(msg_ctx->irpc, irpc);
+       }
+
+       irpc->table   = table;
+       irpc->callnum = callnum;
+       irpc->fn      = fn;
+       irpc->private = private;
+       irpc->uuid = irpc->table->syntax_id.uuid;
+
+       return NT_STATUS_OK;
+}
+
+
+/*
+  handle an incoming irpc reply message
+*/
+static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m)
+{
+       struct irpc_request *irpc;
+       enum ndr_err_code ndr_err;
+
+       irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
+       if (irpc == NULL) return;
 
-       rec->msg = msg;
-       rec->header.version = MESSAGING_VERSION;
-       rec->header.msg_type = msg_type;
-       rec->header.from = msg->server_id;
-       rec->header.to = server;
-       rec->header.length = data?data->length:0;
-       if (rec->header.length != 0) {
-               rec->data = data_blob_talloc(rec, data->data, data->length);
+       /* parse the reply data */
+       ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
+       if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               irpc->status = m->header.status;
+               talloc_steal(irpc->mem_ctx, m);
        } else {
-               rec->data = data_blob(NULL, 0);
+               irpc->status = ndr_map_error2ntstatus(ndr_err);
+               talloc_steal(irpc, m);
        }
-       rec->ndone = 0;
+       irpc->done = true;
+       if (irpc->async.fn) {
+               irpc->async.fn(irpc);
+       }
+}
 
-       status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(rec);
-               return status;
+/*
+  send a irpc reply
+*/
+NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
+{
+       struct ndr_push *push;
+       DATA_BLOB packet;
+       enum ndr_err_code ndr_err;
+
+       m->header.status = status;
+
+       /* setup the reply */
+       push = ndr_push_init_ctx(m->ndr, m->msg_ctx->iconv_convenience);
+       if (push == NULL) {
+               status = NT_STATUS_NO_MEMORY;
+               goto failed;
        }
-       talloc_steal(rec, rec->sock);
 
-       rec->path = messaging_path(rec, server);
+       m->header.flags |= IRPC_FLAG_REPLY;
 
-       status = try_connect(rec);
-       if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
-               /* backoff on this message - the servers listen queue is full */
-               struct timed_event te;
-               te.next_event = timeval_current_ofs(0, MESSAGING_BACKOFF);
-               te.handler = messaging_backoff_handler;
-               te.private = rec;
-               event_add_timed(msg->event.ev, &te);
-               return NT_STATUS_OK;
+       /* construct the packet */
+       ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               goto failed;
        }
 
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(rec);
-               return status;
+       ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               goto failed;
        }
 
-       fde.private     = rec;
-       fde.fd          = socket_get_fd(rec->sock);
-       fde.flags       = EVENT_FD_WRITE;
-       fde.handler     = messaging_send_handler;
+       /* send the reply message */
+       packet = ndr_push_blob(push);
+       status = messaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet);
+       if (!NT_STATUS_IS_OK(status)) goto failed;
 
-       rec->fde        = event_add_fd(msg->event.ev, &fde);
+failed:
+       talloc_free(m);
+       return status;
+}
 
-       talloc_set_destructor(rec, rec_destructor);
+/*
+  handle an incoming irpc request message
+*/
+static void irpc_handler_request(struct messaging_context *msg_ctx, 
+                                struct irpc_message *m)
+{
+       struct irpc_list *i;
+       void *r;
+       enum ndr_err_code ndr_err;
+
+       for (i=msg_ctx->irpc; i; i=i->next) {
+               if (GUID_equal(&i->uuid, &m->header.uuid) &&
+                   i->table->syntax_id.if_version == m->header.if_version &&
+                   i->callnum == m->header.callnum) {
+                       break;
+               }
+       }
 
-       messaging_send_handler(msg->event.ev, rec->fde, timeval_zero(), EVENT_FD_WRITE);
+       if (i == NULL) {
+               /* no registered handler for this message */
+               talloc_free(m);
+               return;
+       }
 
-       return NT_STATUS_OK;
+       /* allocate space for the structure */
+       r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size);
+       if (r == NULL) goto failed;
+
+       /* parse the request data */
+       ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
+
+       /* make the call */
+       m->private     = i->private;
+       m->defer_reply = false;
+       m->msg_ctx     = msg_ctx;
+       m->irpc        = i;
+       m->data        = r;
+       m->ev          = msg_ctx->event.ev;
+
+       m->header.status = i->fn(m, r);
+
+       if (m->defer_reply) {
+               /* the server function has asked to defer the reply to later */
+               talloc_steal(msg_ctx, m);
+               return;
+       }
+
+       irpc_send_reply(m, m->header.status);
+       return;
+
+failed:
+       talloc_free(m);
 }
 
 /*
-  Send a message to a particular server, with the message containing a single pointer
+  handle an incoming irpc message
 */
-NTSTATUS messaging_send_ptr(struct messaging_context *msg, servid_t server
-                           uint32_t msg_type, void *ptr)
+static void irpc_handler(struct messaging_context *msg_ctx, void *private
+                        uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
 {
-       DATA_BLOB blob;
+       struct irpc_message *m;
+       enum ndr_err_code ndr_err;
 
-       blob.data = (void *)&ptr;
-       blob.length = sizeof(void *);
+       m = talloc(msg_ctx, struct irpc_message);
+       if (m == NULL) goto failed;
 
-       return messaging_send(msg, server, msg_type, &blob);
+       m->from = src;
+
+       m->ndr = ndr_pull_init_blob(packet, m, msg_ctx->iconv_convenience);
+       if (m->ndr == NULL) goto failed;
+
+       m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
+
+       ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
+
+       if (m->header.flags & IRPC_FLAG_REPLY) {
+               irpc_handler_reply(msg_ctx, m);
+       } else {
+               irpc_handler_request(msg_ctx, m);
+       }
+       return;
+
+failed:
+       talloc_free(m);
 }
 
 
 /*
-  destroy the messaging context
+  destroy a irpc request
 */
-static int messaging_destructor(void *ptr)
+static int irpc_destructor(struct irpc_request *irpc)
 {
-       struct messaging_context *msg = ptr;
-       event_remove_fd(msg->event.ev, msg->event.fde);
-       unlink(msg->path);
+       if (irpc->callid != -1) {
+               idr_remove(irpc->msg_ctx->idr, irpc->callid);
+               irpc->callid = -1;
+       }
+
+       if (irpc->reject_free) {
+               return -1;
+       }
        return 0;
 }
 
 /*
-  create the listening socket and setup the dispatcher
+  timeout a irpc request
 */
-struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, servid_t server_id, struct event_context *ev)
+static void irpc_timeout(struct event_context *ev, struct timed_event *te, 
+                        struct timeval t, void *private)
 {
-       struct messaging_context *msg;
+       struct irpc_request *irpc = talloc_get_type(private, struct irpc_request);
+       irpc->status = NT_STATUS_IO_TIMEOUT;
+       irpc->done = true;
+       if (irpc->async.fn) {
+               irpc->async.fn(irpc);
+       }
+}
+
+
+/*
+  make a irpc call - async send
+*/
+struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx, 
+                                   struct server_id server_id, 
+                                   const struct ndr_interface_table *table, 
+                                   int callnum, void *r, TALLOC_CTX *ctx)
+{
+       struct irpc_header header;
+       struct ndr_push *ndr;
        NTSTATUS status;
-       struct fd_event fde;
+       DATA_BLOB packet;
+       struct irpc_request *irpc;
+       enum ndr_err_code ndr_err;
+
+       irpc = talloc(msg_ctx, struct irpc_request);
+       if (irpc == NULL) goto failed;
+
+       irpc->msg_ctx  = msg_ctx;
+       irpc->table    = table;
+       irpc->callnum  = callnum;
+       irpc->callid   = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
+       if (irpc->callid == -1) goto failed;
+       irpc->r        = r;
+       irpc->done     = false;
+       irpc->async.fn = NULL;
+       irpc->mem_ctx  = ctx;
+       irpc->reject_free = false;
+
+       talloc_set_destructor(irpc, irpc_destructor);
+
+       /* setup the header */
+       header.uuid = table->syntax_id.uuid;
+
+       header.if_version = table->syntax_id.if_version;
+       header.callid     = irpc->callid;
+       header.callnum    = callnum;
+       header.flags      = 0;
+       header.status     = NT_STATUS_OK;
+
+       /* construct the irpc packet */
+       ndr = ndr_push_init_ctx(irpc, msg_ctx->iconv_convenience);
+       if (ndr == NULL) goto failed;
+
+       ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
+
+       ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
+
+       /* and send it */
+       packet = ndr_push_blob(ndr);
+       status = messaging_send(msg_ctx, server_id, MSG_IRPC, &packet);
+       if (!NT_STATUS_IS_OK(status)) goto failed;
+
+       event_add_timed(msg_ctx->event.ev, irpc, 
+                       timeval_current_ofs(IRPC_CALL_TIMEOUT, 0), 
+                       irpc_timeout, irpc);
+
+       talloc_free(ndr);
+       return irpc;
+
+failed:
+       talloc_free(irpc);
+       return NULL;
+}
 
-       msg = talloc_p(mem_ctx, struct messaging_context);
-       if (msg == NULL) {
-               return NULL;
+/*
+  wait for a irpc reply
+*/
+NTSTATUS irpc_call_recv(struct irpc_request *irpc)
+{
+       NTSTATUS status;
+
+       NT_STATUS_HAVE_NO_MEMORY(irpc);
+
+       irpc->reject_free = true;
+
+       while (!irpc->done) {
+               if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {
+                       return NT_STATUS_CONNECTION_DISCONNECTED;
+               }
        }
 
-       /* create the messaging directory if needed */
-       msg->path = smbd_tmp_path(msg, "messaging");
-       mkdir(msg->path, 0700);
-       talloc_free(msg->path);
+       irpc->reject_free = false;
 
-       msg->server_id = server_id;
-       msg->dispatch = NULL;
-       msg->path = messaging_path(msg, server_id);
+       status = irpc->status;
+       talloc_free(irpc);
+       return status;
+}
 
-       status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(msg);
+/*
+  perform a synchronous irpc request
+*/
+NTSTATUS irpc_call(struct messaging_context *msg_ctx, 
+                  struct server_id server_id, 
+                  const struct ndr_interface_table *table, 
+                  int callnum, void *r,
+                  TALLOC_CTX *mem_ctx)
+{
+       struct irpc_request *irpc = irpc_call_send(msg_ctx, server_id, 
+                                                  table, callnum, r, mem_ctx);
+       return irpc_call_recv(irpc);
+}
+
+/*
+  open the naming database
+*/
+static struct tdb_wrap *irpc_namedb_open(struct messaging_context *msg_ctx)
+{
+       struct tdb_wrap *t;
+       char *path = talloc_asprintf(msg_ctx, "%s/names.tdb", msg_ctx->base_path);
+       if (path == NULL) {
                return NULL;
        }
+       t = tdb_wrap_open(msg_ctx, path, 0, 0, O_RDWR|O_CREAT, 0660);
+       talloc_free(path);
+       return t;
+}
+       
 
-       /* by stealing here we ensure that the socket is cleaned up (and even 
-          deleted) on exit */
-       talloc_steal(msg, msg->sock);
+/*
+  add a string name that this irpc server can be called on
+*/
+NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name)
+{
+       struct tdb_wrap *t;
+       TDB_DATA rec;
+       int count;
+       NTSTATUS status = NT_STATUS_OK;
 
-       status = socket_listen(msg->sock, msg->path, 0, 50, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0,("Unable to setup messaging listener for '%s'\n", msg->path));
-               talloc_free(msg);
-               return NULL;
+       t = irpc_namedb_open(msg_ctx);
+       NT_STATUS_HAVE_NO_MEMORY(t);
+
+       if (tdb_lock_bystring(t->tdb, name) != 0) {
+               talloc_free(t);
+               return NT_STATUS_LOCK_NOT_GRANTED;
        }
+       rec = tdb_fetch_bystring(t->tdb, name);
+       count = rec.dsize / sizeof(struct server_id);
+       rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
+       rec.dsize += sizeof(struct server_id);
+       if (rec.dptr == NULL) {
+               tdb_unlock_bystring(t->tdb, name);
+               talloc_free(t);
+               return NT_STATUS_NO_MEMORY;
+       }
+       ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
+       if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
+               status = NT_STATUS_INTERNAL_ERROR;
+       }
+       free(rec.dptr);
+       tdb_unlock_bystring(t->tdb, name);
+       talloc_free(t);
 
-       fde.private     = msg;
-       fde.fd          = socket_get_fd(msg->sock);
-       fde.flags       = EVENT_FD_READ;
-       fde.handler     = messaging_listen_handler;
+       msg_ctx->names = str_list_add(msg_ctx->names, name);
+       talloc_steal(msg_ctx, msg_ctx->names);
 
-       msg->event.ev   = ev;
-       msg->event.fde  = event_add_fd(ev, &fde);
+       return status;
+}
 
-       talloc_set_destructor(msg, messaging_destructor);
-       
-       messaging_register(msg, NULL, MSG_PING, ping_message);
+/*
+  return a list of server ids for a server name
+*/
+struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
+                                     TALLOC_CTX *mem_ctx,
+                                     const char *name)
+{
+       struct tdb_wrap *t;
+       TDB_DATA rec;
+       int count, i;
+       struct server_id *ret;
 
-       return msg;
+       t = irpc_namedb_open(msg_ctx);
+       if (t == NULL) {
+               return NULL;
+       }
+
+       if (tdb_lock_bystring(t->tdb, name) != 0) {
+               talloc_free(t);
+               return NULL;
+       }
+       rec = tdb_fetch_bystring(t->tdb, name);
+       if (rec.dptr == NULL) {
+               tdb_unlock_bystring(t->tdb, name);
+               talloc_free(t);
+               return NULL;
+       }
+       count = rec.dsize / sizeof(struct server_id);
+       ret = talloc_array(mem_ctx, struct server_id, count+1);
+       if (ret == NULL) {
+               tdb_unlock_bystring(t->tdb, name);
+               talloc_free(t);
+               return NULL;
+       }
+       for (i=0;i<count;i++) {
+               ret[i] = ((struct server_id *)rec.dptr)[i];
+       }
+       ret[i] = cluster_id(0, 0);
+       free(rec.dptr);
+       tdb_unlock_bystring(t->tdb, name);
+       talloc_free(t);
+
+       return ret;
 }
 
+/*
+  remove a name from a messaging context
+*/
+void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
+{
+       struct tdb_wrap *t;
+       TDB_DATA rec;
+       int count, i;
+       struct server_id *ids;
+
+       str_list_remove(msg_ctx->names, name);
 
+       t = irpc_namedb_open(msg_ctx);
+       if (t == NULL) {
+               return;
+       }
+
+       if (tdb_lock_bystring(t->tdb, name) != 0) {
+               talloc_free(t);
+               return;
+       }
+       rec = tdb_fetch_bystring(t->tdb, name);
+       count = rec.dsize / sizeof(struct server_id);
+       if (count == 0) {
+               tdb_unlock_bystring(t->tdb, name);
+               talloc_free(t);
+               return;
+       }
+       ids = (struct server_id *)rec.dptr;
+       for (i=0;i<count;i++) {
+               if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
+                       if (i < count-1) {
+                               memmove(ids+i, ids+i+1, 
+                                       sizeof(struct server_id) * (count-(i+1)));
+                       }
+                       rec.dsize -= sizeof(struct server_id);
+                       break;
+               }
+       }
+       tdb_store_bystring(t->tdb, name, rec, 0);
+       free(rec.dptr);
+       tdb_unlock_bystring(t->tdb, name);
+       talloc_free(t);
+}