#include "includes.h"
#include "util_tdb.h"
#include "serverid.h"
+#include "ctdbd_conn.h"
#ifdef CLUSTER_SUPPORT
-#include "ctdbd_conn.h"
#include "ctdb_packet.h"
#include "messages.h"
struct ctdbd_connection {
struct messaging_context *msg_ctx;
- uint32 reqid;
- uint32 our_vnn;
- uint64 rand_srvid;
+ uint32_t reqid;
+ uint32_t our_vnn;
+ uint64_t rand_srvid;
struct ctdb_packet_context *pkt;
- struct fd_event *fde;
+ struct tevent_fd *fde;
void (*release_ip_handler)(const char *ip_addr, void *private_data);
void *release_ip_priv;
}
static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
- uint32_t vnn, uint32 opcode,
- uint64_t srvid, uint32_t flags, TDB_DATA data,
+ uint32_t vnn, uint32_t opcode,
+ uint64_t srvid, uint32_t flags, TDB_DATA data,
TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
int *cstatus);
/*
* get our vnn from the cluster
*/
-static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32 *vnn)
+static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
{
int32_t cstatus=-1;
NTSTATUS status;
CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
tdb_null, NULL, NULL, &cstatus);
if (!NT_STATUS_IS_OK(status)) {
- cluster_fatal("ctdbd_control failed\n");
+ DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
+ return status;
}
*vnn = (uint32_t)cstatus;
return status;
CTDB_CONTROL_GET_NODEMAP, 0, 0,
tdb_null, talloc_tos(), &outdata, &cstatus);
if (!NT_STATUS_IS_OK(status)) {
- cluster_fatal("ctdbd_control failed\n");
+ DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
+ return false;
}
if ((cstatus != 0) || (outdata.dptr == NULL)) {
DEBUG(2, ("Received invalid ctdb data\n"));
return ret;
}
-uint32 ctdbd_vnn(const struct ctdbd_connection *conn)
+uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
{
return conn->our_vnn;
}
{
struct ctdb_packet_context *result;
const char *sockname = lp_ctdbd_socket();
- struct sockaddr_un addr;
+ struct sockaddr_un addr = { 0, };
int fd;
socklen_t salen;
- if (!sockname || !*sockname) {
- sockname = CTDB_PATH;
- }
-
fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1) {
DEBUG(3, ("Could not create socket: %s\n", strerror(errno)));
return map_nt_error_from_unix(errno);
}
- ZERO_STRUCT(addr);
addr.sun_family = AF_UNIX;
- strncpy(addr.sun_path, sockname, sizeof(addr.sun_path));
+ snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", sockname);
salen = sizeof(struct sockaddr_un);
if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
size_t *length,
void *private_data)
{
- uint32 msglen;
+ uint32_t msglen;
if (available < sizeof(msglen)) {
return False;
}
- msglen = *((uint32 *)buf);
+ msglen = *((const uint32_t *)buf);
DEBUG(11, ("msglen = %d\n", msglen));
* Timed event handler for the deferred message
*/
-static void deferred_message_dispatch(struct event_context *event_ctx,
- struct timed_event *te,
+static void deferred_message_dispatch(struct tevent_context *event_ctx,
+ struct tevent_timer *te,
struct timeval now,
void *private_data)
{
* messages that might come in between.
*/
-static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32 reqid,
+static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
TALLOC_CTX *mem_ctx, void *result)
{
struct ctdb_req_header *hdr;
ctdb_packet_dump(hdr);
if (hdr->operation == CTDB_REQ_MESSAGE) {
- struct timed_event *evt;
+ struct tevent_timer *evt;
struct deferred_msg_state *msg_state;
struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
* We're waiting for a call reply, but an async message has
* crossed. Defer dispatching to the toplevel event loop.
*/
- evt = event_add_timed(conn->msg_ctx->event_ctx,
+ evt = tevent_add_timer(conn->msg_ctx->event_ctx,
conn->msg_ctx->event_ctx,
timeval_zero(),
deferred_message_dispatch,
* The ctdbd socket is readable asynchronuously
*/
-static void ctdbd_socket_handler(struct event_context *event_ctx,
- struct fd_event *event,
+static void ctdbd_socket_handler(struct tevent_context *event_ctx,
+ struct tevent_fd *event,
uint16 flags,
void *private_data)
{
SMB_ASSERT(conn->msg_ctx == NULL);
SMB_ASSERT(conn->fde == NULL);
- if (!(conn->fde = event_add_fd(msg_ctx->event_ctx, conn,
+ if (!(conn->fde = tevent_add_fd(msg_ctx->event_ctx, conn,
ctdb_packet_get_fd(conn->pkt),
- EVENT_FD_READ,
+ TEVENT_FD_READ,
ctdbd_socket_handler,
conn))) {
DEBUG(0, ("event_add_fd failed\n"));
*/
NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn,
- uint32 dst_vnn, uint64 dst_srvid,
+ uint32_t dst_vnn, uint64_t dst_srvid,
struct messaging_rec *msg)
{
- struct ctdb_req_message r;
- TALLOC_CTX *mem_ctx;
DATA_BLOB blob;
NTSTATUS status;
enum ndr_err_code ndr_err;
- if (!(mem_ctx = talloc_init("ctdbd_messaging_send"))) {
- DEBUG(0, ("talloc failed\n"));
- return NT_STATUS_NO_MEMORY;
- }
-
ndr_err = ndr_push_struct_blob(
- &blob, mem_ctx, msg,
+ &blob, talloc_tos(), msg,
(ndr_push_flags_fn_t)ndr_push_messaging_rec);
if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
DEBUG(0, ("ndr_push_struct_blob failed: %s\n",
ndr_errstr(ndr_err)));
- status = ndr_map_error2ntstatus(ndr_err);
- goto fail;
+ return ndr_map_error2ntstatus(ndr_err);
}
- r.hdr.length = offsetof(struct ctdb_req_message, data) + blob.length;
+ status = ctdbd_messaging_send_blob(conn, dst_vnn, dst_srvid,
+ blob.data, blob.length);
+ TALLOC_FREE(blob.data);
+ return status;
+}
+
+NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn,
+ uint32_t dst_vnn, uint64_t dst_srvid,
+ const uint8_t *buf, size_t buflen)
+{
+ struct ctdb_req_message r;
+ NTSTATUS status;
+
+ r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen;
r.hdr.ctdb_magic = CTDB_MAGIC;
r.hdr.ctdb_version = CTDB_VERSION;
r.hdr.generation = 1;
r.hdr.srcnode = conn->our_vnn;
r.hdr.reqid = 0;
r.srvid = dst_srvid;
- r.datalen = blob.length;
+ r.datalen = buflen;
DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
ctdb_packet_dump(&r.hdr);
status = ctdb_packet_send(
conn->pkt, 2,
data_blob_const(&r, offsetof(struct ctdb_req_message, data)),
- blob);
+ data_blob_const(buf, buflen));
if (!NT_STATUS_IS_OK(status)) {
DEBUG(0, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
- goto fail;
+ return status;
}
status = ctdb_packet_flush(conn->pkt);
-
if (!NT_STATUS_IS_OK(status)) {
DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
cluster_fatal("cluster dispatch daemon msg write error\n");
}
-
- status = NT_STATUS_OK;
- fail:
- TALLOC_FREE(mem_ctx);
- return status;
+ return NT_STATUS_OK;
}
/*
* send/recv a generic ctdb control message
*/
static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
- uint32_t vnn, uint32 opcode,
- uint64_t srvid, uint32_t flags,
- TDB_DATA data,
+ uint32_t vnn, uint32_t opcode,
+ uint64_t srvid, uint32_t flags,
+ TDB_DATA data,
TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
int *cstatus)
{
/*
* see if a remote process exists
*/
-bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32 vnn, pid_t pid)
+bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid)
{
struct server_id id;
bool result;
vnn_indexes = talloc_array(mem_ctx, unsigned, num_pids);
if (vnn_indexes == NULL) {
+ DEBUG(1, ("talloc_array failed\n"));
goto fail;
}
vnns = talloc_realloc(mem_ctx, vnns, struct ctdb_vnn_list,
num_vnns+1);
if (vnns == NULL) {
+ DEBUG(1, ("talloc_realloc failed\n"));
goto fail;
}
vnns[num_vnns].vnn = vnn;
vnn->srvids = talloc_array(vnns, uint64_t, vnn->num_srvids);
if (vnn->srvids == NULL) {
+ DEBUG(1, ("talloc_array failed\n"));
goto fail;
}
vnn->pid_indexes = talloc_array(vnns, unsigned,
vnn->num_srvids);
if (vnn->pid_indexes == NULL) {
+ DEBUG(1, ("talloc_array failed\n"));
goto fail;
}
}
if (!ctdb_collect_vnns(talloc_tos(), pids, num_pids,
&vnns, &num_vnns)) {
+ DEBUG(1, ("ctdb_collect_vnns failed\n"));
goto fail;
}
data)),
data_blob_const(vnn->srvids, req.datalen));
if (!NT_STATUS_IS_OK(status)) {
- DEBUG(10, ("ctdb_packet_send failed: %s\n",
- nt_errstr(status)));
+ DEBUG(1, ("ctdb_packet_send failed: %s\n",
+ nt_errstr(status)));
goto fail;
}
}
status = ctdb_packet_flush(conn->pkt);
if (!NT_STATUS_IS_OK(status)) {
- DEBUG(10, ("ctdb_packet_flush failed: %s\n",
- nt_errstr(status)));
+ DEBUG(1, ("ctdb_packet_flush failed: %s\n",
+ nt_errstr(status)));
goto fail;
}
struct ctdb_reply_control *reply = NULL;
struct ctdb_vnn_list *vnn;
uint32_t reqid;
+ uint8_t *reply_data;
status = ctdb_read_req(conn, 0, talloc_tos(), (void *)&reply);
if (!NT_STATUS_IS_OK(status)) {
- DEBUG(10, ("ctdb_read_req failed: %s\n",
- nt_errstr(status)));
+ DEBUG(1, ("ctdb_read_req failed: %s\n",
+ nt_errstr(status)));
goto fail;
}
if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
- DEBUG(10, ("Received invalid reply\n"));
+ DEBUG(1, ("Received invalid reply %u\n",
+ (unsigned)reply->hdr.operation));
goto fail;
}
}
}
if (i == num_vnns) {
- DEBUG(10, ("Received unknown reqid number %u\n",
- (unsigned)reqid));
+ DEBUG(1, ("Received unknown reqid number %u\n",
+ (unsigned)reqid));
goto fail;
}
(unsigned)vnn->vnn, vnn->num_srvids,
(unsigned)reply->datalen));
- if (reply->datalen < ((vnn->num_srvids+7)/8)) {
- DEBUG(10, ("Received short reply\n"));
- goto fail;
+ if (reply->datalen >= ((vnn->num_srvids+7)/8)) {
+ /*
+ * Got a real reply
+ */
+ reply_data = reply->data;
+ } else {
+ /*
+ * Got an error reply
+ */
+ DEBUG(5, ("Received short reply len %d, status %u, "
+ "errorlen %u\n",
+ (unsigned)reply->datalen,
+ (unsigned)reply->status,
+ (unsigned)reply->errorlen));
+ dump_data(5, reply->data, reply->errorlen);
+
+ /*
+ * This will trigger everything set to false
+ */
+ reply_data = NULL;
}
for (i=0; i<vnn->num_srvids; i++) {
results[idx] = true;
continue;
}
- results[idx] = ((reply->data[i/8] & (1<<(i%8))) != 0);
+ results[idx] =
+ (reply_data != NULL) &&
+ ((reply_data[i/8] & (1<<(i%8))) != 0);
}
TALLOC_FREE(reply);
int32_t cstatus;
bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
- data.dptr = (uint8_t*)name;
- data.dsize = strlen(name)+1;
+ data = string_term_tdb_data(name);
status = ctdbd_control(conn, CTDB_CURRENT_NODE,
persistent
/*
* force the migration of a record to this node
*/
-NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32 db_id,
+NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
TDB_DATA key)
{
struct ctdb_req_call req;
}
/*
- * remotely fetch a record (read-only)
+ * Fetch a record and parse it
*/
-NTSTATUS ctdbd_fetch(struct ctdbd_connection *conn, uint32 db_id,
- TDB_DATA key, TALLOC_CTX *mem_ctx, TDB_DATA *data,
- bool local_copy)
+NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
+ TDB_DATA key, bool local_copy,
+ void (*parser)(TDB_DATA key, TDB_DATA data,
+ void *private_data),
+ void *private_data)
{
struct ctdb_req_call req;
struct ctdb_reply_call *reply;
goto fail;
}
- data->dsize = reply->datalen;
- if (data->dsize == 0) {
- data->dptr = NULL;
- goto done;
- }
-
- data->dptr = (uint8 *)talloc_memdup(mem_ctx, &reply->data[0],
- reply->datalen);
- if (data->dptr == NULL) {
- DEBUG(0, ("talloc failed\n"));
- status = NT_STATUS_NO_MEMORY;
+ if (reply->datalen == 0) {
+ /*
+ * Treat an empty record as non-existing
+ */
+ status = NT_STATUS_NOT_FOUND;
goto fail;
}
- done:
+ parser(key, make_tdb_data(&reply->data[0], reply->datalen),
+ private_data);
+
status = NT_STATUS_OK;
fail:
TALLOC_FREE(reply);
everything in-line.
*/
-NTSTATUS ctdbd_traverse(uint32 db_id,
+NTSTATUS ctdbd_traverse(uint32_t db_id,
void (*fn)(TDB_DATA key, TDB_DATA data,
void *private_data),
void *private_data)
#ifdef HAVE_IPV6
if (in->ss_family == AF_INET6) {
const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
- const struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)in;
+ const struct sockaddr_in6 *in6 =
+ (const struct sockaddr_in6 *)in;
struct sockaddr_in *out4 = (struct sockaddr_in *)out;
if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
memset(out, 0, sizeof(*out));
/*
call a control on the local node
*/
-NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32 opcode,
- uint64_t srvid, uint32_t flags, TDB_DATA data,
+NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
+ uint64_t srvid, uint32_t flags, TDB_DATA data,
TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
int *cstatus)
{
return status;
}
+#else
+
+NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn,
+ uint32_t dst_vnn, uint64_t dst_srvid,
+ const uint8_t *buf, size_t buflen)
+{
+ return NT_STATUS_NOT_IMPLEMENTED;
+}
+
#endif