r21233: first version of samba4 messaging using ctdb is working. This means we
authorAndrew Tridgell <tridge@samba.org>
Thu, 8 Feb 2007 02:59:58 +0000 (02:59 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 19:44:50 +0000 (14:44 -0500)
should now work on a real cluster, and not just a localhost simulator
(This used to be commit f05072ad74fb08fd906bc500c5e89930bcc3387f)

source4/cluster/cluster.c
source4/cluster/cluster.h
source4/cluster/ctdb/ctdb_cluster.c
source4/lib/messaging/messaging.c

index ea800d2f62f692ce9798b5e6a6d1269f2b5a041e..4be52b8233bd12c4b623c8696b113be737e7f604 100644 (file)
@@ -80,8 +80,7 @@ struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, i
   register a callback function for a messaging endpoint
 */
 NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server,
-                             void (*handler)(struct messaging_context *, 
-                                             struct server_id, uint32_t, DATA_BLOB))
+                             cluster_message_fn_t handler)
 {
        cluster_init();
        return ops->message_init(ops, msg, server, handler);
index 6f076e7f783a2855fceb2520380f5e62834840f9..d182bf5526211d6dd3d75570c420c49a04ac4d27 100644 (file)
@@ -34,8 +34,7 @@
 #define cluster_node_equal(id1, id2) ((id1)->node == (id2)->node)
 
 struct messaging_context;
-typedef void (*cluster_message_fn_t)(struct messaging_context *, 
-                                    struct server_id, uint32_t, DATA_BLOB);
+typedef void (*cluster_message_fn_t)(struct messaging_context *, DATA_BLOB);
 
 /* prototypes */
 struct server_id cluster_id(uint32_t id);
index 464cb8ecba92be05a260cc2d3428dfeb5be4a799..95adbafadff6718b51880f7cba9b72abb4d82de0 100644 (file)
 #include "lib/tdb/include/tdb.h"
 #include "include/ctdb.h"
 #include "db_wrap.h"
+#include "lib/util/dlinklist.h"
+
+/* a linked list of messaging handlers, allowing incoming messages
+   to be directed to the right messaging context */
+struct cluster_messaging_list {
+       struct cluster_messaging_list *next, *prev;
+       struct cluster_state *state;
+       struct messaging_context *msg;
+       struct server_id server;
+       cluster_message_fn_t handler;
+};
 
 struct cluster_state {
        struct ctdb_context *ctdb;
+       struct cluster_messaging_list *list;
 };
 
 
+
 /*
   return a server_id for a ctdb node
 */
@@ -91,6 +104,33 @@ static void *ctdb_backend_handle(struct cluster_ops *ops)
        return (void *)state->ctdb;
 }
 
+/*
+  dispatch incoming ctdb messages
+*/
+static void ctdb_message_handler(struct ctdb_context *ctdb, uint32_t srvid, 
+                                TDB_DATA data, void *private)
+{
+       struct cluster_state *state = talloc_get_type(private, struct cluster_state);
+       struct cluster_messaging_list *m;
+       for (m=state->list;m;m=m->next) {
+               if (srvid == m->server.id) {
+                       DATA_BLOB bdata;
+                       bdata.data   = data.dptr;
+                       bdata.length = data.dsize;
+                       m->handler(m->msg, bdata);
+               }
+       }
+}
+
+/*
+  destroy a element of messaging list (when messaging context goes away)
+*/
+static int cluster_messaging_destructor(struct cluster_messaging_list *m)
+{
+       DLIST_REMOVE(m->state->list, m);
+       return 0;
+}
+
 /*
   setup a handler for ctdb messages
 */
@@ -99,6 +139,19 @@ static NTSTATUS ctdb_message_init(struct cluster_ops *ops,
                                  struct server_id server,
                                  cluster_message_fn_t handler)
 {
+       struct cluster_state *state = ops->private;
+       struct cluster_messaging_list *m;
+       m = talloc(msg, struct cluster_messaging_list);
+       NT_STATUS_HAVE_NO_MEMORY(m);
+       
+       m->state   = state;
+       m->msg     = msg;
+       m->server  = server;
+       m->handler = handler;
+       DLIST_ADD(state->list, m);
+
+       talloc_set_destructor(m, cluster_messaging_destructor);
+
        return NT_STATUS_OK;
 }
 
@@ -109,7 +162,19 @@ static NTSTATUS ctdb_message_send(struct cluster_ops *ops,
                                  struct server_id server, uint32_t msg_type, 
                                  DATA_BLOB *data)
 {
-       return NT_STATUS_INVALID_DEVICE_REQUEST;
+       struct cluster_state *state = ops->private;
+       struct ctdb_context *ctdb = state->ctdb;
+       TDB_DATA tdata;
+       int ret;
+
+       tdata.dptr = data->data;
+       tdata.dsize = data->length;
+
+       ret = ctdb_send_message(ctdb, server.node, server.id, msg_type, tdata);
+       if (ret != 0) {
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+       return NT_STATUS_OK;
 }
 
 static struct cluster_ops cluster_ctdb_ops = {
@@ -148,6 +213,8 @@ void cluster_ctdb_init(struct event_context *ev)
        state->ctdb = ctdb_init(ev);
        if (state->ctdb == NULL) goto failed;
 
+       state->list = NULL;
+
        cluster_ctdb_ops.private = state;
 
        ret = ctdb_set_transport(state->ctdb, transport);
@@ -181,6 +248,14 @@ void cluster_ctdb_init(struct event_context *ev)
                goto failed;
         }
 
+       /* setup messaging handler */
+       ret = ctdb_set_message_handler(state->ctdb, ctdb_message_handler, state);
+        if (ret == -1) {
+                DEBUG(0,("ctdb_set_message_handler failed - %s\n", 
+                        ctdb_errstr(state->ctdb)));
+               goto failed;
+        }
+
        ret = ctdb_attach(state->ctdb, "cluster.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
        if (ret == -1) {
                DEBUG(0,("ctdb_attach failed - %s\n", ctdb_errstr(state->ctdb)));
@@ -199,6 +274,7 @@ void cluster_ctdb_init(struct event_context *ev)
        ctdb_connect_wait(state->ctdb);
 
        cluster_set_ops(&cluster_ctdb_ops);
+
        return;
        
 failed:
index a0439377332180aec1228dda0ddeaf35ca2ca306..03bfb6b571f7820770bd06900f8d3aa25d9dd435 100644 (file)
@@ -155,8 +155,7 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
 /*
   handler for messages that arrive from other nodes in the cluster
 */
-static void cluster_message_handler(struct messaging_context *msg, struct server_id from,
-                                   uint32_t msg_type, DATA_BLOB packet)
+static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet)
 {
        struct messaging_rec *rec;
 
@@ -165,7 +164,6 @@ static void cluster_message_handler(struct messaging_context *msg, struct server
                smb_panic("Unable to allocate messaging_rec");
        }
 
-       talloc_steal(rec, packet.data);
        rec->msg           = msg;
        rec->path          = msg->path;
        rec->header        = (struct messaging_header *)packet.data;
@@ -406,12 +404,6 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
        NTSTATUS status;
        size_t dlength = data?data->length:0;
 
-       if (!cluster_node_equal(&msg->server_id, &server)) {
-               /* the destination is on another node - dispatch via
-                  the cluster layer */
-               return cluster_message_send(server, msg_type, data);
-       }
-
        rec = talloc(msg, struct messaging_rec);
        if (rec == NULL) {
                return NT_STATUS_NO_MEMORY;
@@ -435,6 +427,14 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
                       data->data, dlength);
        }
 
+       if (!cluster_node_equal(&msg->server_id, &server)) {
+               /* the destination is on another node - dispatch via
+                  the cluster layer */
+               status = cluster_message_send(server, msg_type, &rec->packet);
+               talloc_free(rec);
+               return status;
+       }
+
        rec->path = messaging_path(msg, server);
        talloc_steal(rec, rec->path);