Merge branch 'master' of ssh://git.samba.org/data/git/samba into noejs
[sfrench/samba-autobuild/.git] / source4 / lib / messaging / messaging.c
index 24b5ff408df740cf99b412580f8e866bf1a29f69..2125ba1fe6b88803a31bdf5978ad59d97218579c 100644 (file)
 #include "lib/socket/socket.h"
 #include "librpc/gen_ndr/ndr_irpc.h"
 #include "lib/messaging/irpc.h"
-#include "db_wrap.h"
+#include "tdb_wrap.h"
 #include "lib/util/unix_privs.h"
 #include "librpc/rpc/dcerpc.h"
-#include "lib/tdb/include/tdb.h"
-#include "lib/util/util_tdb.h"
+#include "../tdb/include/tdb.h"
 #include "lib/util/util_tdb.h"
 #include "cluster/cluster.h"
+#include "param/param.h"
 
 /* change the message version with any incompatible changes in the protocol */
 #define MESSAGING_VERSION 1
@@ -48,6 +48,7 @@ struct messaging_context {
        struct idr_context *dispatch_tree;
        struct messaging_rec *pending;
        struct messaging_rec *retry_queue;
+       struct smb_iconv_convenience *iconv_convenience;
        struct irpc_list *irpc;
        struct idr_context *idr;
        const char **names;
@@ -119,8 +120,8 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg,
 */
 static char *messaging_path(struct messaging_context *msg, struct server_id server_id)
 {
-       return talloc_asprintf(msg, "%s/msg.%u.%u", msg->base_path, 
-                              (unsigned)server_id.node, (unsigned)server_id.id);
+       return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, 
+                              cluster_id_string(msg, server_id));
 }
 
 /*
@@ -136,7 +137,8 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
 
        /* temporary IDs use an idtree, the rest use a array of pointers */
        if (rec->header->msg_type >= MSG_TMP_BASE) {
-               d = idr_find(msg->dispatch_tree, rec->header->msg_type);
+               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
+                                                  rec->header->msg_type);
        } else if (rec->header->msg_type < msg->num_types) {
                d = msg->dispatch[rec->header->msg_type];
        } else {
@@ -260,8 +262,10 @@ static void messaging_send_handler(struct messaging_context *msg)
                }
                rec->retries = 0;
                if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", 
-                                rec->header->from.id, rec->header->to.id, rec->header->msg_type, 
+                       DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 
+                                cluster_id_string(debug_ctx(), rec->header->from), 
+                                cluster_id_string(debug_ctx(), rec->header->to), 
+                                rec->header->msg_type, 
                                 nt_errstr(status)));
                }
                DLIST_REMOVE(msg->pending, rec);
@@ -416,7 +420,8 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void
        struct dispatch_fn *d, *next;
 
        if (msg_type >= msg->num_types) {
-               d = idr_find(msg->dispatch_tree, msg_type);
+               d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, 
+                                                  msg_type);
                if (!d) return;
                idr_remove(msg->dispatch_tree, msg_type);
                talloc_free(d);
@@ -456,6 +461,8 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
        rec->retries       = 0;
        rec->msg              = msg;
        rec->header           = (struct messaging_header *)rec->packet.data;
+       /* zero padding */
+       ZERO_STRUCTP(rec->header);
        rec->header->version  = MESSAGING_VERSION;
        rec->header->msg_type = msg_type;
        rec->header->from     = msg->server_id;
@@ -504,7 +511,7 @@ NTSTATUS messaging_send_ptr(struct messaging_context *msg, struct server_id serv
 {
        DATA_BLOB blob;
 
-       blob.data = (void *)&ptr;
+       blob.data = (uint8_t *)&ptr;
        blob.length = sizeof(void *);
 
        return messaging_send(msg, server, msg_type, &blob);
@@ -527,13 +534,18 @@ static int messaging_destructor(struct messaging_context *msg)
   create the listening socket and setup the dispatcher
 */
 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
+                                        const char *dir,
                                         struct server_id server_id, 
+                                        struct smb_iconv_convenience *iconv_convenience,
                                         struct event_context *ev)
 {
        struct messaging_context *msg;
        NTSTATUS status;
        struct socket_address *path;
-       char *dir;
+
+       if (ev == NULL) {
+               return NULL;
+       }
 
        msg = talloc_zero(mem_ctx, struct messaging_context);
        if (msg == NULL) {
@@ -547,18 +559,13 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
                return NULL;
        }
 
-       if (ev == NULL) {
-               ev = event_context_init(msg);
-       }
-
        /* create the messaging directory if needed */
-       dir = smbd_tmp_path(msg, "messaging");
        mkdir(dir, 0700);
-       talloc_free(dir);
 
-       msg->base_path     = smbd_tmp_path(msg, "messaging");
+       msg->base_path     = talloc_reference(msg, dir);
        msg->path          = messaging_path(msg, server_id);
        msg->server_id     = server_id;
+       msg->iconv_convenience = iconv_convenience;
        msg->idr           = idr_init(msg);
        msg->dispatch_tree = idr_init(msg);
        msg->start_time    = timeval_current();
@@ -588,7 +595,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
        }
 
        /* it needs to be non blocking for sends */
-       set_blocking(socket_get_fd(msg->sock), False);
+       set_blocking(socket_get_fd(msg->sock), false);
 
        msg->event.ev   = talloc_reference(msg, ev);
        msg->event.fde  = event_add_fd(ev, msg, socket_get_fd(msg->sock), 
@@ -607,12 +614,14 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
    A hack, for the short term until we get 'client only' messaging in place 
 */
 struct messaging_context *messaging_client_init(TALLOC_CTX *mem_ctx, 
+                                               const char *dir,
+                                               struct smb_iconv_convenience *iconv_convenience,
                                                struct event_context *ev)
 {
        struct server_id id;
        ZERO_STRUCT(id);
        id.id = random() % 0x10000000;
-       return messaging_init(mem_ctx, id, ev);
+       return messaging_init(mem_ctx, dir, id, iconv_convenience, ev);
 }
 /*
   a list of registered irpc server functions
@@ -664,19 +673,21 @@ NTSTATUS irpc_register(struct messaging_context *msg_ctx,
 static void irpc_handler_reply(struct messaging_context *msg_ctx, struct irpc_message *m)
 {
        struct irpc_request *irpc;
+       enum ndr_err_code ndr_err;
 
-       irpc = idr_find(msg_ctx->idr, m->header.callid);
+       irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
        if (irpc == NULL) return;
 
        /* parse the reply data */
-       irpc->status = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
-       if (NT_STATUS_IS_OK(irpc->status)) {
+       ndr_err = irpc->table->calls[irpc->callnum].ndr_pull(m->ndr, NDR_OUT, irpc->r);
+       if (NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                irpc->status = m->header.status;
                talloc_steal(irpc->mem_ctx, m);
        } else {
+               irpc->status = ndr_map_error2ntstatus(ndr_err);
                talloc_steal(irpc, m);
        }
-       irpc->done = True;
+       irpc->done = true;
        if (irpc->async.fn) {
                irpc->async.fn(irpc);
        }
@@ -689,11 +700,12 @@ NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
 {
        struct ndr_push *push;
        DATA_BLOB packet;
+       enum ndr_err_code ndr_err;
 
        m->header.status = status;
 
        /* setup the reply */
-       push = ndr_push_init_ctx(m->ndr);
+       push = ndr_push_init_ctx(m->ndr, m->msg_ctx->iconv_convenience);
        if (push == NULL) {
                status = NT_STATUS_NO_MEMORY;
                goto failed;
@@ -702,11 +714,17 @@ NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
        m->header.flags |= IRPC_FLAG_REPLY;
 
        /* construct the packet */
-       status = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               goto failed;
+       }
 
-       status = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               status = ndr_map_error2ntstatus(ndr_err);
+               goto failed;
+       }
 
        /* send the reply message */
        packet = ndr_push_blob(push);
@@ -726,7 +744,7 @@ static void irpc_handler_request(struct messaging_context *msg_ctx,
 {
        struct irpc_list *i;
        void *r;
-       NTSTATUS status;
+       enum ndr_err_code ndr_err;
 
        for (i=msg_ctx->irpc; i; i=i->next) {
                if (GUID_equal(&i->uuid, &m->header.uuid) &&
@@ -747,12 +765,12 @@ static void irpc_handler_request(struct messaging_context *msg_ctx,
        if (r == NULL) goto failed;
 
        /* parse the request data */
-       status = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
        /* make the call */
        m->private     = i->private;
-       m->defer_reply = False;
+       m->defer_reply = false;
        m->msg_ctx     = msg_ctx;
        m->irpc        = i;
        m->data        = r;
@@ -780,20 +798,20 @@ static void irpc_handler(struct messaging_context *msg_ctx, void *private,
                         uint32_t msg_type, struct server_id src, DATA_BLOB *packet)
 {
        struct irpc_message *m;
-       NTSTATUS status;
+       enum ndr_err_code ndr_err;
 
        m = talloc(msg_ctx, struct irpc_message);
        if (m == NULL) goto failed;
 
        m->from = src;
 
-       m->ndr = ndr_pull_init_blob(packet, m);
+       m->ndr = ndr_pull_init_blob(packet, m, msg_ctx->iconv_convenience);
        if (m->ndr == NULL) goto failed;
 
        m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
 
-       status = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
        if (m->header.flags & IRPC_FLAG_REPLY) {
                irpc_handler_reply(msg_ctx, m);
@@ -831,7 +849,7 @@ static void irpc_timeout(struct event_context *ev, struct timed_event *te,
 {
        struct irpc_request *irpc = talloc_get_type(private, struct irpc_request);
        irpc->status = NT_STATUS_IO_TIMEOUT;
-       irpc->done = True;
+       irpc->done = true;
        if (irpc->async.fn) {
                irpc->async.fn(irpc);
        }
@@ -851,6 +869,7 @@ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
        NTSTATUS status;
        DATA_BLOB packet;
        struct irpc_request *irpc;
+       enum ndr_err_code ndr_err;
 
        irpc = talloc(msg_ctx, struct irpc_request);
        if (irpc == NULL) goto failed;
@@ -861,10 +880,10 @@ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
        irpc->callid   = idr_get_new(msg_ctx->idr, irpc, UINT16_MAX);
        if (irpc->callid == -1) goto failed;
        irpc->r        = r;
-       irpc->done     = False;
+       irpc->done     = false;
        irpc->async.fn = NULL;
        irpc->mem_ctx  = ctx;
-       irpc->reject_free = False;
+       irpc->reject_free = false;
 
        talloc_set_destructor(irpc, irpc_destructor);
 
@@ -878,14 +897,14 @@ struct irpc_request *irpc_call_send(struct messaging_context *msg_ctx,
        header.status     = NT_STATUS_OK;
 
        /* construct the irpc packet */
-       ndr = ndr_push_init_ctx(irpc);
+       ndr = ndr_push_init_ctx(irpc, msg_ctx->iconv_convenience);
        if (ndr == NULL) goto failed;
 
-       status = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
-       status = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
-       if (!NT_STATUS_IS_OK(status)) goto failed;
+       ndr_err = table->calls[callnum].ndr_push(ndr, NDR_IN, r);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
 
        /* and send it */
        packet = ndr_push_blob(ndr);
@@ -1035,7 +1054,7 @@ struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
        for (i=0;i<count;i++) {
                ret[i] = ((struct server_id *)rec.dptr)[i];
        }
-       ret[i] = cluster_id(0);
+       ret[i] = cluster_id(0, 0);
        free(rec.dptr);
        tdb_unlock_bystring(t->tdb, name);
        talloc_free(t);
@@ -1065,8 +1084,14 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
                return;
        }
        rec = tdb_fetch_bystring(t->tdb, name);
+       if (rec.dptr == NULL) {
+               tdb_unlock_bystring(t->tdb, name);
+               talloc_free(t);
+               return;
+       }
        count = rec.dsize / sizeof(struct server_id);
        if (count == 0) {
+               free(rec.dptr);
                tdb_unlock_bystring(t->tdb, name);
                talloc_free(t);
                return;
@@ -1087,3 +1112,8 @@ void irpc_remove_name(struct messaging_context *msg_ctx, const char *name)
        tdb_unlock_bystring(t->tdb, name);
        talloc_free(t);
 }
+
+struct server_id messaging_get_server_id(struct messaging_context *msg_ctx)
+{
+       return msg_ctx->server_id;
+}