r26003: Split up DB_WRAP, as first step in an attempt to sanitize dependencies.
[jelmer/samba4-debian.git] / source / lib / messaging / messaging.c
index bf06d68a3386911e66d8e89db81d12b2ff3b35dd..df0bfa32a69f5c03344782a104948212e19f0812 100644 (file)
@@ -7,7 +7,7 @@
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
+   the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
    
    This program is distributed in the hope that it will be useful,
    GNU General Public License for more details.
    
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "includes.h"
 #include "lib/events/events.h"
 #include "system/filesys.h"
 #include "messaging/messaging.h"
-#include "dlinklist.h"
+#include "lib/util/dlinklist.h"
 #include "lib/socket/socket.h"
 #include "librpc/gen_ndr/ndr_irpc.h"
 #include "lib/messaging/irpc.h"
-#include "db_wrap.h"
-#include "lib/tdb/include/tdbutil.h"
+#include "tdb_wrap.h"
 #include "lib/util/unix_privs.h"
 #include "librpc/rpc/dcerpc.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/util/util_tdb.h"
+#include "lib/util/util_tdb.h"
+#include "cluster/cluster.h"
+#include "param/param.h"
 
 /* change the message version with any incompatible changes in the protocol */
 #define MESSAGING_VERSION 1
 
 struct messaging_context {
-       uint32_t server_id;
+       struct server_id server_id;
        struct socket_context *sock;
        const char *base_path;
        const char *path;
@@ -45,11 +48,12 @@ struct messaging_context {
        uint32_t num_types;
        struct idr_context *dispatch_tree;
        struct messaging_rec *pending;
+       struct messaging_rec *retry_queue;
        struct irpc_list *irpc;
        struct idr_context *idr;
        const char **names;
        struct timeval start_time;
-
+       struct timed_event *retry_te;
        struct {
                struct event_context *ev;
                struct fd_event *fde;
@@ -74,27 +78,28 @@ struct messaging_rec {
        struct messaging_header {
                uint32_t version;
                uint32_t msg_type;
-               uint32_t from;
-               uint32_t to;
+               struct server_id from;
+               struct server_id to;
                uint32_t length;
        } *header;
 
        DATA_BLOB packet;
+       uint32_t retries;
 };
 
 
 static void irpc_handler(struct messaging_context *, void *, 
-                        uint32_t, uint32_t, DATA_BLOB *);
+                        uint32_t, struct server_id, DATA_BLOB *);
 
 
 /*
  A useful function for testing the message system.
 */
 static void ping_message(struct messaging_context *msg, void *private, 
-                        uint32_t msg_type, uint32_t src, DATA_BLOB *data)
+                        uint32_t msg_type, struct server_id src, DATA_BLOB *data)
 {
-       DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n",
-                (uint_t)src, (int)data->length, 
+       DEBUG(1,("INFO: Received PING message from server %u.%u [%.*s]\n",
+                (uint_t)src.node, (uint_t)src.id, (int)data->length, 
                 data->data?(const char *)data->data:""));
        messaging_send(msg, src, MSG_PONG, data);
 }
@@ -113,9 +118,10 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg,
 /* 
    return the path to a messaging socket
 */
-static char *messaging_path(struct messaging_context *msg, uint32_t server_id)
+static char *messaging_path(struct messaging_context *msg, struct server_id server_id)
 {
-       return talloc_asprintf(msg, "%s/msg.%u", msg->base_path, (unsigned)server_id);
+       return talloc_asprintf(msg, "%s/msg.%u.%u", msg->base_path, 
+                              (unsigned)server_id.node, (unsigned)server_id.id);
 }
 
 /*
@@ -131,7 +137,8 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
 
        /* temporary IDs use an idtree, the rest use a array of pointers */
        if (rec->header->msg_type >= MSG_TMP_BASE) {
-               d = idr_find(msg->dispatch_tree, rec->header->msg_type);
+               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
+                                                  rec->header->msg_type);
        } else if (rec->header->msg_type < msg->num_types) {
                d = msg->dispatch[rec->header->msg_type];
        } else {
@@ -148,6 +155,36 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
        rec->header->length = 0;
 }
 
+/*
+  handler for messages that arrive from other nodes in the cluster
+*/
+static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet)
+{
+       struct messaging_rec *rec;
+
+       rec = talloc(msg, struct messaging_rec);
+       if (rec == NULL) {
+               smb_panic("Unable to allocate messaging_rec");
+       }
+
+       rec->msg           = msg;
+       rec->path          = msg->path;
+       rec->header        = (struct messaging_header *)packet.data;
+       rec->packet        = packet;
+       rec->retries       = 0;
+
+       if (packet.length != sizeof(*rec->header) + rec->header->length) {
+               DEBUG(0,("messaging: bad message header size %d should be %d\n", 
+                        rec->header->length, (int)(packet.length - sizeof(*rec->header))));
+               talloc_free(rec);
+               return;
+       }
+
+       messaging_dispatch(msg, rec);
+       talloc_free(rec);
+}
+
+
 
 /*
   try to send the message
@@ -177,6 +214,26 @@ static NTSTATUS try_send(struct messaging_rec *rec)
        return status;
 }
 
+/*
+  retry backed off messages
+*/
+static void msg_retry_timer(struct event_context *ev, struct timed_event *te, 
+                           struct timeval t, void *private)
+{
+       struct messaging_context *msg = talloc_get_type(private, 
+                                                       struct messaging_context);
+       msg->retry_te = NULL;
+
+       /* put the messages back on the main queue */
+       while (msg->retry_queue) {
+               struct messaging_rec *rec = msg->retry_queue;
+               DLIST_REMOVE(msg->retry_queue, rec);
+               DLIST_ADD_END(msg->pending, rec, struct messaging_rec *);
+       }
+
+       EVENT_FD_WRITEABLE(msg->event.fde);     
+}
+
 /*
   handle a socket write event
 */
@@ -187,11 +244,26 @@ static void messaging_send_handler(struct messaging_context *msg)
                NTSTATUS status;
                status = try_send(rec);
                if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
+                       rec->retries++;
+                       if (rec->retries > 3) {
+                               /* we're getting continuous write errors -
+                                  backoff this record */
+                               DLIST_REMOVE(msg->pending, rec);
+                               DLIST_ADD_END(msg->retry_queue, rec, 
+                                             struct messaging_rec *);
+                               if (msg->retry_te == NULL) {
+                                       msg->retry_te = 
+                                               event_add_timed(msg->event.ev, msg, 
+                                                               timeval_current_ofs(1, 0), 
+                                                               msg_retry_timer, msg);
+                               }
+                       }
                        break;
                }
+               rec->retries = 0;
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", 
-                                rec->header->from, rec->header->to, rec->header->msg_type, 
+                                rec->header->from.id, rec->header->to.id, rec->header->msg_type, 
                                 nt_errstr(status)));
                }
                DLIST_REMOVE(msg->pending, rec);
@@ -248,6 +320,7 @@ static void messaging_recv_handler(struct messaging_context *msg)
        rec->path          = msg->path;
        rec->header        = (struct messaging_header *)packet.data;
        rec->packet        = packet;
+       rec->retries       = 0;
 
        if (msize != sizeof(*rec->header) + rec->header->length) {
                DEBUG(0,("messaging: bad message header size %d should be %d\n", 
@@ -345,7 +418,8 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void
        struct dispatch_fn *d, *next;
 
        if (msg_type >= msg->num_types) {
-               d = idr_find(msg->dispatch_tree, msg_type);
+               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
+                                                  msg_type);
                if (!d) return;
                idr_remove(msg->dispatch_tree, msg_type);
                talloc_free(d);
@@ -364,7 +438,7 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void
 /*
   Send a message to a particular server
 */
-NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, 
+NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, 
                        uint32_t msg_type, DATA_BLOB *data)
 {
        struct messaging_rec *rec;
@@ -382,6 +456,7 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
                return NT_STATUS_NO_MEMORY;
        }
 
+       rec->retries       = 0;
        rec->msg              = msg;
        rec->header           = (struct messaging_header *)rec->packet.data;
        rec->header->version  = MESSAGING_VERSION;
@@ -394,6 +469,14 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
                       data->data, dlength);
        }
 
+       if (!cluster_node_equal(&msg->server_id, &server)) {
+               /* the destination is on another node - dispatch via
+                  the cluster layer */
+               status = cluster_message_send(server, &rec->packet);
+               talloc_free(rec);
+               return status;
+       }
+
        rec->path = messaging_path(msg, server);
        talloc_steal(rec, rec->path);
 
@@ -419,12 +502,12 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server,
 /*
   Send a message to a particular server, with the message containing a single pointer
 */
-NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server, 
+NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id server, 
                            uint32_t msg_type, void *ptr)
 {
        DATA_BLOB blob;
 
-       blob.data = (void *)&ptr;
+       blob.data = (uint8_t *)&ptr;
        blob.length = sizeof(void *);
 
        return messaging_send(msg, server, msg_type, &blob);
@@ -434,9 +517,8 @@ NTSTATUS messaging_send_ptr(struct messaging_context *msg, uint32_t server,
 /*
   destroy the messaging context
 */
-static int messaging_destructor(void *ptr)
+static int messaging_destructor(struct messaging_context *msg)
 {
-       struct messaging_context *msg = ptr;
        unlink(msg->path);
        while (msg->names && msg->names[0]) {
                irpc_remove_name(msg, msg->names[0]);
@@ -447,29 +529,35 @@ static int messaging_destructor(void *ptr)
 /*
   create the listening socket and setup the dispatcher
 */
-struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id, 
+struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
+                                        const char *dir,
+                                        struct server_id server_id, 
                                         struct event_context *ev)
 {
        struct messaging_context *msg;
        NTSTATUS status;
        struct socket_address *path;
-       char *dir;
 
        msg = talloc_zero(mem_ctx, struct messaging_context);
        if (msg == NULL) {
                return NULL;
        }
 
+       /* setup a handler for messages from other cluster nodes, if appropriate */
+       status = cluster_message_init(msg, server_id, cluster_message_handler);
+       if (!NT_STATUS_IS_OK(status)) {
+               talloc_free(msg);
+               return NULL;
+       }
+
        if (ev == NULL) {
                ev = event_context_init(msg);
        }
 
        /* create the messaging directory if needed */
-       dir = smbd_tmp_path(msg, "messaging");
        mkdir(dir, 0700);
-       talloc_free(dir);
 
-       msg->base_path     = smbd_tmp_path(msg, "messaging");
+       msg->base_path     = talloc_reference(msg, dir);
        msg->path          = messaging_path(msg, server_id);
        msg->server_id     = server_id;
        msg->idr           = idr_init(msg);
@@ -501,7 +589,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
        }
 
        /* it needs to be non blocking for sends */
-       set_blocking(socket_get_fd(msg->sock), False);
+       set_blocking(socket_get_fd(msg->sock), false);
 
        msg->event.ev   = talloc_reference(msg, ev);
        msg->event.fde  = event_add_fd(ev, msg, socket_get_fd(msg->sock), 
@@ -520,9 +608,13 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id
    A hack, for the short term until we get 'client only' messaging in place 
 */
 struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 
+                                               const char *dir,
                                                struct event_context *ev)
 {
-       return messaging_init(mem_ctx, random() % 0x10000000, ev);
+       struct server_id id;
+       ZERO_STRUCT(id);
+       id.id = random() % 0x10000000;
+       return messaging_init(mem_ctx, dir, id, ev);
 }
 /*
   a list of registered irpc server functions
@@ -530,7 +622,7 @@ struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx,
 struct irpc_list {
        struct irpc_list *next, *prev;
        struct GUID uuid;
-       const struct dcerpc_interface_table *table;
+       const struct ndr_interface_table *table;
        int callnum;
        irpc_function_t fn;
        void *private;
@@ -541,7 +633,7 @@ struct irpc_list {
   register a irpc server function
 */
 NTSTATUS irpc_register(struct messaging_context *msg_ctx, 
-                      const struct dcerpc_interface_table *table, 
+                      const struct ndr_interface_table *table, 
                       int callnum, irpc_function_t fn, void *private)
 {
        struct irpc_list *irpc;
@@ -574,19 +666,21 @@ NTSTATUS irpc_register(struct messaging_context *msg_ctx,
 static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m)
 {
        struct irpc_request *irpc;
+       enum ndr_err_code ndr_err;
 
-       irpc = idr_find(msg_ctx->idr, m->header.callid);
+       irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
        if (irpc == NULL) return;
 
        /* parse the reply data */
-       irpc->status = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
-       if (NT_STATUS_IS_OK(irpc->status)) {
+       ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
+       if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                irpc->status = m->header.status;
                talloc_steal(irpc->mem_ctx, m);
        } else {
+               irpc->status = ndr_map_error2ntstatus(ndr_err);
                talloc_steal(irpc, m);
        }
-       irpc->done = True;
+       irpc->done = true;
        if (irpc->async.fn) {
                irpc->async.fn(irpc);
        }
@@ -599,6 +693,7 @@ NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
 {
        struct ndr_push *push;
        DATA_BLOB packet;
+       enum ndr_err_code ndr_err;
 
        m->header.status = status;
 
@@ -612,11 +707,17 @@ NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
        m->header.flags |= IRPC_FLAG_REPLY;
 
        /* construct the packet */
-       status = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               goto failed;
+       }
 
-       status = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               goto failed;
+       }
 
        /* send the reply message */
        packet = ndr_push_blob(push);
@@ -636,7 +737,7 @@ static void irpc_handler_request(struct messaging_context *msg_ctx,
 {
        struct irpc_list *i;
        void *r;
-       NTSTATUS status;
+       enum ndr_err_code ndr_err;
 
        for (i=msg_ctx->irpc; i; i=i->next) {
                if (GUID_equal(&i->uuid, &m->header.uuid) &&
@@ -657,12 +758,12 @@ static void irpc_handler_request(struct messaging_context *msg_ctx,
        if (r == NULL) goto failed;
 
        /* parse the request data */
-       status = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
        /* make the call */
        m->private     = i->private;
-       m->defer_reply = False;
+       m->defer_reply = false;
        m->msg_ctx     = msg_ctx;
        m->irpc        = i;
        m->data        = r;
@@ -687,10 +788,10 @@ failed:
   handle an incoming irpc message
 */
 static void irpc_handler(struct messaging_context *msg_ctx, void *private, 
-                        uint32_t msg_type, uint32_t src, DATA_BLOB *packet)
+                        uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
 {
        struct irpc_message *m;
-       NTSTATUS status;
+       enum ndr_err_code ndr_err;
 
        m = talloc(msg_ctx, struct irpc_message);
        if (m == NULL) goto failed;
@@ -702,8 +803,8 @@ static void irpc_handler(struct messaging_context *msg_ctx, void *private,
 
        m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
 
-       status = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
        if (m->header.flags & IRPC_FLAG_REPLY) {
                irpc_handler_reply(msg_ctx, m);
@@ -720,10 +821,16 @@ failed:
 /*
   destroy a irpc request
 */
-static int irpc_destructor(void *ptr)
+static int irpc_destructor(struct irpc_request *irpc)
 {
-       struct irpc_request *irpc = talloc_get_type(ptr, struct irpc_request);
-       idr_remove(irpc->msg_ctx->idr, irpc->callid);
+       if (irpc->callid != -1) {
+               idr_remove(irpc->msg_ctx->idr, irpc->callid);
+               irpc->callid = -1;
+       }
+
+       if (irpc->reject_free) {
+               return -1;
+       }
        return 0;
 }
 
@@ -735,7 +842,7 @@ static void irpc_timeout(struct event_context *ev, struct timed_event *te,
 {
        struct irpc_request *irpc = talloc_get_type(private, struct irpc_request);
        irpc->status = NT_STATUS_IO_TIMEOUT;
-       irpc->done = True;
+       irpc->done = true;
        if (irpc->async.fn) {
                irpc->async.fn(irpc);
        }
@@ -746,8 +853,8 @@ static void irpc_timeout(struct event_context *ev, struct timed_event *te,
   make a irpc call - async send
 */
 struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx, 
-                                   uint32_t server_id, 
-                                   const struct dcerpc_interface_table *table, 
+                                   struct server_id server_id, 
+                                   const struct ndr_interface_table *table, 
                                    int callnum, void *r, TALLOC_CTX *ctx)
 {
        struct irpc_header header;
@@ -755,6 +862,7 @@ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
        NTSTATUS status;
        DATA_BLOB packet;
        struct irpc_request *irpc;
+       enum ndr_err_code ndr_err;
 
        irpc = talloc(msg_ctx, struct irpc_request);
        if (irpc == NULL) goto failed;
@@ -765,9 +873,10 @@ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
        irpc->callid   = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
        if (irpc->callid == -1) goto failed;
        irpc->r        = r;
-       irpc->done     = False;
+       irpc->done     = false;
        irpc->async.fn = NULL;
        irpc->mem_ctx  = ctx;
+       irpc->reject_free = false;
 
        talloc_set_destructor(irpc, irpc_destructor);
 
@@ -784,11 +893,11 @@ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
        ndr = ndr_push_init_ctx(irpc);
        if (ndr == NULL) goto failed;
 
-       status = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
-       status = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
        /* and send it */
        packet = ndr_push_blob(ndr);
@@ -816,11 +925,16 @@ NTSTATUS irpc_call_recv(struct irpc_request *irpc)
 
        NT_STATUS_HAVE_NO_MEMORY(irpc);
 
+       irpc->reject_free = true;
+
        while (!irpc->done) {
                if (event_loop_once(irpc->msg_ctx->event.ev) != 0) {
                        return NT_STATUS_CONNECTION_DISCONNECTED;
                }
        }
+
+       irpc->reject_free = false;
+
        status = irpc->status;
        talloc_free(irpc);
        return status;
@@ -830,8 +944,8 @@ NTSTATUS irpc_call_recv(struct irpc_request *irpc)
   perform a synchronous irpc request
 */
 NTSTATUS irpc_call(struct messaging_context *msg_ctx, 
-                  uint32_t server_id, 
-                  const struct dcerpc_interface_table *table, 
+                  struct server_id server_id, 
+                  const struct ndr_interface_table *table, 
                   int callnum, void *r,
                   TALLOC_CTX *mem_ctx)
 {
@@ -874,15 +988,15 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name)
                return NT_STATUS_LOCK_NOT_GRANTED;
        }
        rec = tdb_fetch_bystring(t->tdb, name);
-       count = rec.dsize / sizeof(uint32_t);
-       rec.dptr = (unsigned char *)realloc_p(rec.dptr, uint32_t, count+1);
-       rec.dsize += sizeof(uint32_t);
+       count = rec.dsize / sizeof(struct server_id);
+       rec.dptr = (unsigned char *)realloc_p(rec.dptr, struct server_id, count+1);
+       rec.dsize += sizeof(struct server_id);
        if (rec.dptr == NULL) {
                tdb_unlock_bystring(t->tdb, name);
                talloc_free(t);
                return NT_STATUS_NO_MEMORY;
        }
-       ((uint32_t *)rec.dptr)[count] = msg_ctx->server_id;
+       ((struct server_id *)rec.dptr)[count] = msg_ctx->server_id;
        if (tdb_store_bystring(t->tdb, name, rec, 0) != 0) {
                status = NT_STATUS_INTERNAL_ERROR;
        }
@@ -899,12 +1013,14 @@ NTSTATUS irpc_add_name(struct messaging_context *msg_ctx, const char *name)
 /*
   return a list of server ids for a server name
 */
-uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *name)
+struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
+                                     TALLOC_CTX *mem_ctx,
+                                     const char *name)
 {
        struct tdb_wrap *t;
        TDB_DATA rec;
        int count, i;
-       uint32_t *ret;
+       struct server_id *ret;
 
        t = irpc_namedb_open(msg_ctx);
        if (t == NULL) {
@@ -921,17 +1037,17 @@ uint32_t *irpc_servers_byname(struct messaging_context *msg_ctx, const char *nam
                talloc_free(t);
                return NULL;
        }
-       count = rec.dsize / sizeof(uint32_t);
-       ret = talloc_array(msg_ctx, uint32_t, count+1);
+       count = rec.dsize / sizeof(struct server_id);
+       ret = talloc_array(mem_ctx, struct server_id, count+1);
        if (ret == NULL) {
                tdb_unlock_bystring(t->tdb, name);
                talloc_free(t);
                return NULL;
        }
        for (i=0;i<count;i++) {
-               ret[i] = ((uint32_t *)rec.dptr)[i];
+               ret[i] = ((struct server_id *)rec.dptr)[i];
        }
-       ret[i] = 0;
+       ret[i] = cluster_id(0);
        free(rec.dptr);
        tdb_unlock_bystring(t->tdb, name);
        talloc_free(t);
@@ -947,7 +1063,7 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
        struct tdb_wrap *t;
        TDB_DATA rec;
        int count, i;
-       uint32_t *ids;
+       struct server_id *ids;
 
        str_list_remove(msg_ctx->names, name);
 
@@ -961,19 +1077,20 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
                return;
        }
        rec = tdb_fetch_bystring(t->tdb, name);
-       count = rec.dsize / sizeof(uint32_t);
+       count = rec.dsize / sizeof(struct server_id);
        if (count == 0) {
                tdb_unlock_bystring(t->tdb, name);
                talloc_free(t);
                return;
        }
-       ids = (uint32_t *)rec.dptr;
+       ids = (struct server_id *)rec.dptr;
        for (i=0;i<count;i++) {
-               if (ids[i] == msg_ctx->server_id) {
+               if (cluster_id_equal(&ids[i], &msg_ctx->server_id)) {
                        if (i < count-1) {
-                               memmove(ids+i, ids+i+1, count-(i+1));
+                               memmove(ids+i, ids+i+1, 
+                                       sizeof(struct server_id) * (count-(i+1)));
                        }
-                       rec.dsize -= sizeof(uint32_t);
+                       rec.dsize -= sizeof(struct server_id);
                        break;
                }
        }