Rework cluster_id() to take an additional argument, as we need
authorAndrew Bartlett <abartlet@samba.org>
Mon, 4 Feb 2008 06:51:38 +0000 (17:51 +1100)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 4 Feb 2008 06:51:38 +0000 (17:51 +1100)
<node>.<pid>.<fd> to be unique in a prefork process environment.

Andrew Bartlett and David Disseldorp

source/cluster/cluster.c
source/cluster/cluster.h
source/cluster/cluster_private.h
source/cluster/ctdb/ctdb_cluster.c
source/cluster/local.c
source/lib/messaging/messaging.c
source/lib/messaging/tests/irpc.c
source/lib/messaging/tests/messaging.c
source/librpc/idl/misc.idl
source/scripting/ejs/smbcalls_rpc.c
source/smbd/service_stream.c

index 6bac1dcbe504f79597c3c0d29155232abd7ca9b6..cc61974cbd06adcadbd9dcdf96ffc75b7ab6eee4 100644 (file)
@@ -47,12 +47,12 @@ static void cluster_init(void)
 }
 
 /*
-  server a server_id for the local node
+  create a server_id for the local node
 */
-struct server_id cluster_id(uint32_t id)
+struct server_id cluster_id(uint64_t id, uint32_t id2)
 {
        cluster_init();
-       return ops->cluster_id(ops, id);
+       return ops->cluster_id(ops, id, id2);
 }
 
 
index 7cd31282cc10aab94252197f78dd47c45d4d8eb1..203aef439c78902681f05429af928eaae2e32b73 100644 (file)
@@ -36,7 +36,7 @@ struct messaging_context;
 typedef void (*cluster_message_fn_t)(struct messaging_context *, DATA_BLOB);
 
 /* prototypes */
-struct server_id cluster_id(uint32_t id);
+struct server_id cluster_id(uint64_t id, uint32_t id2);
 const char *cluster_id_string(TALLOC_CTX *mem_ctx, struct server_id id);
 struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, struct loadparm_context *lp_ctx, const char *dbname, int flags);
 void *cluster_backend_handle(void);
index 1c895b8640d54e1d46f323280fd813c5c4d6ade5..79394b46dbc1565be9af80cc754c9598a7462a78 100644 (file)
@@ -23,7 +23,7 @@
 #define _CLUSTER_PRIVATE_H_
 
 struct cluster_ops {
-       struct server_id (*cluster_id)(struct cluster_ops *ops, uint32_t id);
+       struct server_id (*cluster_id)(struct cluster_ops *ops, uint64_t id, uint32_t id2);
        const char *(*cluster_id_string)(struct cluster_ops *ops, 
                                         TALLOC_CTX *, struct server_id );
        struct tdb_wrap *(*cluster_tdb_tmp_open)(struct cluster_ops *,
index 53df1e968e7f45ec07125be1dc2574c14d51a5c4..ce295c4474ff9a88981488d81f7a80920947c4b0 100644 (file)
@@ -52,13 +52,14 @@ struct cluster_state {
 /*
   return a server_id for a ctdb node
 */
-static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id)
+static struct server_id ctdb_id(struct cluster_ops *ops, uint64_t id, uint32_t id2)
 {
        struct cluster_state *state = (struct cluster_state *)ops->private;
        struct ctdb_context *ctdb = state->ctdb;
        struct server_id server_id;
        server_id.node = ctdb_get_vnn(ctdb);
        server_id.id = id;
+       server_id.id2 = id2;
        return server_id;
 }
 
@@ -69,7 +70,7 @@ static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id)
 static const char *ctdb_id_string(struct cluster_ops *ops, 
                                  TALLOC_CTX *mem_ctx, struct server_id id)
 {
-       return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id);
+       return talloc_asprintf(mem_ctx, "%u.%llu.%u", id.node, (unsigned long long)id.id, id.id2);
 }
 
 /*
index 539e47d271318342d6b97487ef768cd22f45ce78..96636927f13a27de40b5304eaadf7db2d1676680 100644 (file)
 /*
   server a server_id for the local node
 */
-static struct server_id local_id(struct cluster_ops *ops, uint32_t id)
+static struct server_id local_id(struct cluster_ops *ops, uint64_t id, uint32_t id2)
 {
        struct server_id server_id;
        ZERO_STRUCT(server_id);
        server_id.id = id;
+       server_id.id2 = id2;
        return server_id;
 }
 
@@ -46,7 +47,7 @@ static struct server_id local_id(struct cluster_ops *ops, uint32_t id)
 static const char *local_id_string(struct cluster_ops *ops,
                                   TALLOC_CTX *mem_ctx, struct server_id id)
 {
-       return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id);
+       return talloc_asprintf(mem_ctx, "%u.%llu.%u", id.node, (unsigned long long)id.id, id.id2);
 }
 
 
index 811d5a85bfdbe330e0210ace345bab3fe93a1deb..9cb10f961c25199b20f2ecb5171eec370632f183 100644 (file)
@@ -263,8 +263,10 @@ static void messaging_send_handler(struct messaging_context *msg)
                }
                rec->retries = 0;
                if (!NT_STATUS_IS_OK(status)) {
-                       DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", 
-                                rec->header->from.id, rec->header->to.id, rec->header->msg_type, 
+                       DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", 
+                                cluster_id_string(debug_ctx(), rec->header->from), 
+                                cluster_id_string(debug_ctx(), rec->header->to), 
+                                rec->header->msg_type, 
                                 nt_errstr(status)));
                }
                DLIST_REMOVE(msg->pending, rec);
@@ -1051,7 +1053,7 @@ struct server_id *irpc_servers_byname(struct messaging_context *msg_ctx,
        for (i=0;i<count;i++) {
                ret[i] = ((struct server_id *)rec.dptr)[i];
        }
-       ret[i] = cluster_id(0);
+       ret[i] = cluster_id(0, 0);
        free(rec.dptr);
        tdb_unlock_bystring(t->tdb, name);
        talloc_free(t);
index a2995fc9835148c31eb2c2ae8d6ebc16d9aa76b6..d9b0548643ddbda9c89c47cdcead5a538ea6eed5 100644 (file)
@@ -93,7 +93,7 @@ static bool test_addone(struct torture_context *test, const void *_data,
        r.in.in_data = value;
 
        test_debug = true;
-       status = IRPC_CALL(data->msg_ctx1, cluster_id(MSG_ID2), 
+       status = IRPC_CALL(data->msg_ctx1, cluster_id(0, MSG_ID2), 
                           rpcecho, ECHO_ADDONE, &r, test);
        test_debug = false;
        torture_assert_ntstatus_ok(test, status, "AddOne failed");
@@ -122,7 +122,7 @@ static bool test_echodata(struct torture_context *tctx,
        r.in.in_data = (unsigned char *)talloc_strdup(mem_ctx, "0123456789");
        r.in.len = strlen((char *)r.in.in_data);
 
-       status = IRPC_CALL(data->msg_ctx1, cluster_id(MSG_ID2), 
+       status = IRPC_CALL(data->msg_ctx1, cluster_id(0, MSG_ID2), 
                           rpcecho, ECHO_ECHODATA, &r, 
                           mem_ctx);
        torture_assert_ntstatus_ok(tctx, status, "EchoData failed");
@@ -180,7 +180,7 @@ static bool test_speed(struct torture_context *tctx,
        while (timeval_elapsed(&tv) < timelimit) {
                struct irpc_request *irpc;
 
-               irpc = IRPC_CALL_SEND(data->msg_ctx1, cluster_id(MSG_ID2), 
+               irpc = IRPC_CALL_SEND(data->msg_ctx1, cluster_id(0, MSG_ID2), 
                                      rpcecho, ECHO_ADDONE, 
                                      &r, mem_ctx);
                torture_assert(tctx, irpc != NULL, "AddOne send failed");
@@ -221,7 +221,7 @@ static bool irpc_setup(struct torture_context *tctx, void **_data)
        torture_assert(tctx, data->msg_ctx1 = 
                       messaging_init(tctx, 
                                      lp_messaging_path(tctx, tctx->lp_ctx), 
-                                     cluster_id(MSG_ID1),
+                                     cluster_id(0, MSG_ID1),
                                      lp_iconv_convenience(tctx->lp_ctx),
                                      data->ev),
                       "Failed to init first messaging context");
@@ -229,7 +229,7 @@ static bool irpc_setup(struct torture_context *tctx, void **_data)
        torture_assert(tctx, data->msg_ctx2 = 
                       messaging_init(tctx, 
                                      lp_messaging_path(tctx, tctx->lp_ctx),
-                                     cluster_id(MSG_ID2), 
+                                     cluster_id(0, MSG_ID2), 
                                      lp_iconv_convenience(tctx->lp_ctx),
                                      data->ev),
                       "Failed to init second messaging context");
index 0df04bce2bbc355364c5b4d53d70d2db69336e5c..45b573518cc87d691b81c90686dda96c6af77bb7 100644 (file)
@@ -73,7 +73,7 @@ static bool test_ping_speed(struct torture_context *tctx)
 
        msg_server_ctx = messaging_init(tctx, 
                                        lp_messaging_path(tctx, tctx->lp_ctx), 
-                                       cluster_id(1), 
+                                       cluster_id(0, 1), 
                                        lp_iconv_convenience(tctx->lp_ctx),
                                        ev);
        
@@ -84,7 +84,7 @@ static bool test_ping_speed(struct torture_context *tctx)
 
        msg_client_ctx = messaging_init(tctx, 
                                        lp_messaging_path(tctx, tctx->lp_ctx), 
-                                       cluster_id(2), 
+                                       cluster_id(0, 2), 
                                        lp_iconv_convenience(tctx->lp_ctx),
                                        ev);
 
@@ -103,8 +103,8 @@ static bool test_ping_speed(struct torture_context *tctx)
                data.data = discard_const_p(uint8_t, "testing");
                data.length = strlen((const char *)data.data);
 
-               status1 = messaging_send(msg_client_ctx, cluster_id(1), msg_ping, &data);
-               status2 = messaging_send(msg_client_ctx, cluster_id(1), msg_ping, NULL);
+               status1 = messaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, &data);
+               status2 = messaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, NULL);
 
                torture_assert_ntstatus_ok(tctx, status1, "msg1 failed");
                ping_count++;
@@ -124,7 +124,7 @@ static bool test_ping_speed(struct torture_context *tctx)
        }
 
        torture_comment(tctx, "sending exit\n");
-       messaging_send(msg_client_ctx, cluster_id(1), msg_exit, NULL);
+       messaging_send(msg_client_ctx, cluster_id(0, 1), msg_exit, NULL);
 
        torture_assert_int_equal(tctx, ping_count, pong_count, "ping test failed");
 
index 0861758187dc280783fc4d4cb7c7bd0d6116f296..8331977398dfe1285992f765ad93512b52e99a58 100644 (file)
@@ -51,7 +51,8 @@ interface misc
 
        /* id used to identify a endpoint, possibly in a cluster */
        typedef [public] struct {
-               uint32 id;
+               hyper id;
+               uint32 id2;
                uint32 node;
        } server_id;
 }
index 2bfc8b5883427d7ea4bafc79ea4054613830807a..44cfa16d7e875b78a51c0d54450b89ac46989236 100644 (file)
@@ -80,7 +80,7 @@ static int ejs_irpc_connect(MprVarHandle eid, int argc, char **argv)
        for (i=0;i<10000;i++) {
                p->msg_ctx = messaging_init(p, 
                                            lp_messaging_path(p, global_loadparm),
-                                           cluster_id(EJS_ID_BASE + i), 
+                                           cluster_id(EJS_ID_BASE, i), 
                                            lp_iconv_convenience(global_loadparm),
                                            ev);
                if (p->msg_ctx) break;
index 0d6f1b72817db810e69627eb5a6cd6457e7f50ba..7e1f6493ee8409eeb977032dc271ad777ab1ae3b 100644 (file)
@@ -136,7 +136,7 @@ NTSTATUS stream_new_connection_merge(struct event_context *ev,
        srv_conn->private       = private_data;
        srv_conn->model_ops     = model_ops;
        srv_conn->socket        = sock;
-       srv_conn->server_id     = cluster_id(0);
+       srv_conn->server_id     = cluster_id(0, 0);
        srv_conn->ops           = stream_ops;
        srv_conn->msg_ctx       = msg_ctx;
        srv_conn->event.ctx     = ev;
@@ -274,8 +274,11 @@ NTSTATUS stream_setup_socket(struct event_context *event_context,
                NT_STATUS_NOT_OK_RETURN(status);
        }
 
-       /* TODO: set socket ACL's here when they're implemented */
+       /* TODO: set socket ACL's (host allow etc) here when they're
+        * implemented */
 
+       /* Some sockets don't have a port, or are just described from
+        * the string.  We are indicating this by having port == NULL */
        if (!port) {
                socket_address = socket_address_from_strings(stream_socket, 
                                                             stream_socket->sock->backend_name,
@@ -314,9 +317,16 @@ NTSTATUS stream_setup_socket(struct event_context *event_context,
                return status;
        }
 
-       /* we will close the socket using the events system */
+       /* By specifying EVENT_FD_AUTOCLOSE below, we indicate that we
+        * will close the socket using the events system.  This avoids
+        * nasty interactions with waiting for talloc to close the socket. */
+
        socket_set_flags(stream_socket->sock, SOCKET_FLAG_NOCLOSE);
 
+       /* Add the FD from the newly created socket into the event
+        * subsystem.  it will call the accept handler whenever we get
+        * new connections */
+
        event_add_fd(event_context, stream_socket->sock, 
                     socket_get_fd(stream_socket->sock), 
                     EVENT_FD_READ|EVENT_FD_AUTOCLOSE,