}
/*
- server a server_id for the local node
+ create a server_id for the local node
*/
-struct server_id cluster_id(uint32_t id)
+struct server_id cluster_id(uint64_t id, uint32_t id2)
{
cluster_init();
- return ops->cluster_id(ops, id);
+ return ops->cluster_id(ops, id, id2);
}
typedef void (*cluster_message_fn_t)(struct messaging_context *, DATA_BLOB);
/* prototypes */
-struct server_id cluster_id(uint32_t id);
+struct server_id cluster_id(uint64_t id, uint32_t id2);
const char *cluster_id_string(TALLOC_CTX *mem_ctx, struct server_id id);
struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, struct loadparm_context *lp_ctx, const char *dbname, int flags);
void *cluster_backend_handle(void);
#define _CLUSTER_PRIVATE_H_
struct cluster_ops {
- struct server_id (*cluster_id)(struct cluster_ops *ops, uint32_t id);
+ struct server_id (*cluster_id)(struct cluster_ops *ops, uint64_t id, uint32_t id2);
const char *(*cluster_id_string)(struct cluster_ops *ops,
TALLOC_CTX *, struct server_id );
struct tdb_wrap *(*cluster_tdb_tmp_open)(struct cluster_ops *,
/*
return a server_id for a ctdb node
*/
-static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id)
+static struct server_id ctdb_id(struct cluster_ops *ops, uint64_t id, uint32_t id2)
{
struct cluster_state *state = (struct cluster_state *)ops->private;
struct ctdb_context *ctdb = state->ctdb;
struct server_id server_id;
server_id.node = ctdb_get_vnn(ctdb);
server_id.id = id;
+ server_id.id2 = id2;
return server_id;
}
static const char *ctdb_id_string(struct cluster_ops *ops,
TALLOC_CTX *mem_ctx, struct server_id id)
{
- return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id);
+ return talloc_asprintf(mem_ctx, "%u.%llu.%u", id.node, (unsigned long long)id.id, id.id2);
}
/*
/*
server a server_id for the local node
*/
-static struct server_id local_id(struct cluster_ops *ops, uint32_t id)
+static struct server_id local_id(struct cluster_ops *ops, uint64_t id, uint32_t id2)
{
struct server_id server_id;
ZERO_STRUCT(server_id);
server_id.id = id;
+ server_id.id2 = id2;
return server_id;
}
static const char *local_id_string(struct cluster_ops *ops,
TALLOC_CTX *mem_ctx, struct server_id id)
{
- return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id);
+ return talloc_asprintf(mem_ctx, "%u.%llu.%u", id.node, (unsigned long long)id.id, id.id2);
}
}
rec->retries = 0;
if (!NT_STATUS_IS_OK(status)) {
- DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n",
- rec->header->from.id, rec->header->to.id, rec->header->msg_type,
+ DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
+ cluster_id_string(debug_ctx(), rec->header->from),
+ cluster_id_string(debug_ctx(), rec->header->to),
+ rec->header->msg_type,
nt_errstr(status)));
}
DLIST_REMOVE(msg->pending, rec);
for (i=0;i<count;i++) {
ret[i] = ((struct server_id *)rec.dptr)[i];
}
- ret[i] = cluster_id(0);
+ ret[i] = cluster_id(0, 0);
free(rec.dptr);
tdb_unlock_bystring(t->tdb, name);
talloc_free(t);
r.in.in_data = value;
test_debug = true;
- status = IRPC_CALL(data->msg_ctx1, cluster_id(MSG_ID2),
+ status = IRPC_CALL(data->msg_ctx1, cluster_id(0, MSG_ID2),
rpcecho, ECHO_ADDONE, &r, test);
test_debug = false;
torture_assert_ntstatus_ok(test, status, "AddOne failed");
r.in.in_data = (unsigned char *)talloc_strdup(mem_ctx, "0123456789");
r.in.len = strlen((char *)r.in.in_data);
- status = IRPC_CALL(data->msg_ctx1, cluster_id(MSG_ID2),
+ status = IRPC_CALL(data->msg_ctx1, cluster_id(0, MSG_ID2),
rpcecho, ECHO_ECHODATA, &r,
mem_ctx);
torture_assert_ntstatus_ok(tctx, status, "EchoData failed");
while (timeval_elapsed(&tv) < timelimit) {
struct irpc_request *irpc;
- irpc = IRPC_CALL_SEND(data->msg_ctx1, cluster_id(MSG_ID2),
+ irpc = IRPC_CALL_SEND(data->msg_ctx1, cluster_id(0, MSG_ID2),
rpcecho, ECHO_ADDONE,
&r, mem_ctx);
torture_assert(tctx, irpc != NULL, "AddOne send failed");
torture_assert(tctx, data->msg_ctx1 =
messaging_init(tctx,
lp_messaging_path(tctx, tctx->lp_ctx),
- cluster_id(MSG_ID1),
+ cluster_id(0, MSG_ID1),
lp_iconv_convenience(tctx->lp_ctx),
data->ev),
"Failed to init first messaging context");
torture_assert(tctx, data->msg_ctx2 =
messaging_init(tctx,
lp_messaging_path(tctx, tctx->lp_ctx),
- cluster_id(MSG_ID2),
+ cluster_id(0, MSG_ID2),
lp_iconv_convenience(tctx->lp_ctx),
data->ev),
"Failed to init second messaging context");
msg_server_ctx = messaging_init(tctx,
lp_messaging_path(tctx, tctx->lp_ctx),
- cluster_id(1),
+ cluster_id(0, 1),
lp_iconv_convenience(tctx->lp_ctx),
ev);
msg_client_ctx = messaging_init(tctx,
lp_messaging_path(tctx, tctx->lp_ctx),
- cluster_id(2),
+ cluster_id(0, 2),
lp_iconv_convenience(tctx->lp_ctx),
ev);
data.data = discard_const_p(uint8_t, "testing");
data.length = strlen((const char *)data.data);
- status1 = messaging_send(msg_client_ctx, cluster_id(1), msg_ping, &data);
- status2 = messaging_send(msg_client_ctx, cluster_id(1), msg_ping, NULL);
+ status1 = messaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, &data);
+ status2 = messaging_send(msg_client_ctx, cluster_id(0, 1), msg_ping, NULL);
torture_assert_ntstatus_ok(tctx, status1, "msg1 failed");
ping_count++;
}
torture_comment(tctx, "sending exit\n");
- messaging_send(msg_client_ctx, cluster_id(1), msg_exit, NULL);
+ messaging_send(msg_client_ctx, cluster_id(0, 1), msg_exit, NULL);
torture_assert_int_equal(tctx, ping_count, pong_count, "ping test failed");
/* id used to identify a endpoint, possibly in a cluster */
typedef [public] struct {
- uint32 id;
+ hyper id;
+ uint32 id2;
uint32 node;
} server_id;
}
for (i=0;i<10000;i++) {
p->msg_ctx = messaging_init(p,
lp_messaging_path(p, global_loadparm),
- cluster_id(EJS_ID_BASE + i),
+ cluster_id(EJS_ID_BASE, i),
lp_iconv_convenience(global_loadparm),
ev);
if (p->msg_ctx) break;
srv_conn->private = private_data;
srv_conn->model_ops = model_ops;
srv_conn->socket = sock;
- srv_conn->server_id = cluster_id(0);
+ srv_conn->server_id = cluster_id(0, 0);
srv_conn->ops = stream_ops;
srv_conn->msg_ctx = msg_ctx;
srv_conn->event.ctx = ev;
NT_STATUS_NOT_OK_RETURN(status);
}
- /* TODO: set socket ACL's here when they're implemented */
+ /* TODO: set socket ACL's (host allow etc) here when they're
+ * implemented */
+ /* Some sockets don't have a port, or are just described from
+ * the string. We are indicating this by having port == NULL */
if (!port) {
socket_address = socket_address_from_strings(stream_socket,
stream_socket->sock->backend_name,
return status;
}
- /* we will close the socket using the events system */
+ /* By specifying EVENT_FD_AUTOCLOSE below, we indicate that we
+ * will close the socket using the events system. This avoids
+ * nasty interactions with waiting for talloc to close the socket. */
+
socket_set_flags(stream_socket->sock, SOCKET_FLAG_NOCLOSE);
+ /* Add the FD from the newly created socket into the event
+ * subsystem. it will call the accept handler whenever we get
+ * new connections */
+
event_add_fd(event_context, stream_socket->sock,
socket_get_fd(stream_socket->sock),
EVENT_FD_READ|EVENT_FD_AUTOCLOSE,