X-Git-Url: http://git.samba.org/?a=blobdiff_plain;f=ctdb%2Fclient%2Fctdb_client.c;h=da18826fa59d12e64ef5b7231e54c44047ae77ab;hb=e0bf5dd4566785b41ad1fa0492a9f215639f1685;hp=1190fba8b2ac9a996bb08d2e2222c97c2581e269;hpb=86cd78efeef8e6304311f6d5511706465a1bb973;p=obnox%2Fsamba%2Fsamba-obnox.git diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c index 1190fba8b2a..da18826fa59 100644 --- a/ctdb/client/ctdb_client.c +++ b/ctdb/client/ctdb_client.c @@ -19,10 +19,9 @@ */ #include "includes.h" -#include "db_wrap.h" -#include "lib/tdb/include/tdb.h" +#include "lib/tdb_wrap/tdb_wrap.h" +#include "tdb.h" #include "lib/util/dlinklist.h" -#include "lib/tevent/tevent.h" #include "system/network.h" #include "system/filesys.h" #include "system/locale.h" @@ -30,8 +29,6 @@ #include "../include/ctdb_private.h" #include "lib/util/dlinklist.h" -pid_t ctdbd_pid; - /* allocate a packet for use in client<->daemon communication */ @@ -47,18 +44,17 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb, length = MAX(length, slength); size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); - hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size); + hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size); if (hdr == NULL) { DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n", operation, (unsigned)length)); return NULL; } talloc_set_name_const(hdr, type); - memset(hdr, 0, slength); hdr->length = length; hdr->operation = operation; hdr->ctdb_magic = CTDB_MAGIC; - hdr->ctdb_version = CTDB_VERSION; + hdr->ctdb_version = CTDB_PROTOCOL; hdr->srcnode = ctdb->pnn; if (ctdb->vnn_map) { hdr->generation = ctdb->vnn_map->generation; @@ -201,8 +197,8 @@ void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) talloc_steal(tmp_ctx, hdr); if (cnt == 0) { - DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n")); - exit(0); + DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n")); + exit(1); } if (cnt < sizeof(*hdr)) { @@ -220,7 +216,7 @@ void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) goto done; } - if (hdr->ctdb_version != CTDB_VERSION) { + if (hdr->ctdb_version != CTDB_PROTOCOL) { ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version); goto done; } @@ -247,57 +243,32 @@ done: } /* - connect with exponential backoff, thanks Stevens + connect to a unix domain socket */ -#define CONNECT_MAXSLEEP 64 -static int ctdb_connect_retry(struct ctdb_context *ctdb) +int ctdb_socket_connect(struct ctdb_context *ctdb) { struct sockaddr_un addr; - int secs; - int ret = 0; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); - - for (secs = 1; secs <= CONNECT_MAXSLEEP; secs *= 2) { - ret = connect(ctdb->daemon.sd, (struct sockaddr *)&addr, - sizeof(addr)); - if ((ret == 0) || (errno != EAGAIN)) { - break; - } - - if (secs <= (CONNECT_MAXSLEEP / 2)) { - DEBUG(DEBUG_ERR,("connect failed: %s, retry in %d second(s)\n", - strerror(errno), secs)); - sleep(secs); - } - } - - return ret; -} + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1); -/* - connect to a unix domain socket -*/ -int ctdb_socket_connect(struct ctdb_context *ctdb) -{ ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); if (ctdb->daemon.sd == -1) { DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno)); return -1; } - set_nonblocking(ctdb->daemon.sd); - set_close_on_exec(ctdb->daemon.sd); - - if (ctdb_connect_retry(ctdb) == -1) { - DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno)); + if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { close(ctdb->daemon.sd); ctdb->daemon.sd = -1; + DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno)); return -1; } + set_nonblocking(ctdb->daemon.sd); + set_close_on_exec(ctdb->daemon.sd); + ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, CTDB_DS_ALIGNMENT, ctdb_client_read_cb, ctdb, "to-ctdbd"); @@ -503,11 +474,10 @@ int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, ctdb_msg_fn_t handler, void *private_data) - { int res; int32_t status; - + res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, tdb_null, NULL, NULL, &status, NULL, NULL); if (res != 0 || status != 0) { @@ -526,7 +496,7 @@ int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid { int res; int32_t status; - + res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, tdb_null, NULL, NULL, &status, NULL, NULL); if (res != 0 || status != 0) { @@ -539,6 +509,42 @@ int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid return 0; } +/* + * check server ids + */ +int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num, + uint8_t *result) +{ + TDB_DATA indata, outdata; + int res; + int32_t status; + int i; + + indata.dptr = (uint8_t *)ids; + indata.dsize = num * sizeof(*ids); + + res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0, + indata, ctdb, &outdata, &status, NULL, NULL); + if (res != 0 || status != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n")); + return -1; + } + + if (outdata.dsize != num*sizeof(uint8_t)) { + DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n", + (long unsigned int)num*sizeof(uint8_t), + outdata.dsize)); + talloc_free(outdata.dptr); + return -1; + } + + for (i=0; idata[0], data.dptr, data.dsize); res = ctdb_client_queue_pkt(ctdb, &r->hdr); - if (res != 0) { - return res; - } - talloc_free(r); - return 0; + return res; } @@ -705,6 +707,21 @@ again: goto again; } + /* if this is a request for read/write and we have delegations + we have to revoke all delegations first + */ + if ((h->header.dmaster == ctdb_db->ctdb->pnn) && + (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n")); return h; } @@ -1159,6 +1176,17 @@ int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, state = ctdb_control_send(ctdb, destnode, srvid, opcode, flags, data, mem_ctx, timeout, errormsg); + + /* FIXME: Error conditions in ctdb_control_send return NULL without + * setting errormsg. So, there is no way to distinguish between sucess + * and failure when CTDB_CTRL_FLAG_NOREPLY is set */ + if (flags & CTDB_CTRL_FLAG_NOREPLY) { + if (status != NULL) { + *status = 0; + } + return 0; + } + return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, errormsg); } @@ -1218,6 +1246,61 @@ int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ct return 0; } +/* + * get db statistics + */ +int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, + TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat) +{ + int ret; + TDB_DATA indata, outdata; + int32_t res; + struct ctdb_db_statistics *wire, *s; + char *ptr; + int i; + + indata.dptr = (uint8_t *)&dbid; + indata.dsize = sizeof(dbid); + + ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS, + 0, indata, ctdb, &outdata, &res, NULL, NULL); + if (ret != 0 || res != 0) { + DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n")); + return -1; + } + + if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) { + DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n", + outdata.dsize, + (long unsigned int)sizeof(struct ctdb_statistics))); + return -1; + } + + s = talloc_zero(mem_ctx, struct ctdb_db_statistics); + if (s == NULL) { + talloc_free(outdata.dptr); + CTDB_NO_MEMORY(ctdb, s); + } + + wire = (struct ctdb_db_statistics *)outdata.dptr; + *s = *wire; + ptr = &wire->hot_keys_wire[0]; + for (i=0; inum_hot_keys; i++) { + s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize); + if (s->hot_keys[i].key.dptr == NULL) { + talloc_free(outdata.dptr); + CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr); + } + + memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize); + ptr += wire->hot_keys[i].key.dsize; + } + + talloc_free(outdata.dptr); + *dbstat = s; + return 0; +} + /* shutdown a remote ctdb node */ @@ -1662,6 +1745,36 @@ int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode) return res; } +int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, + struct timeval timeout, + uint32_t destnode, + uint32_t *runstate) +{ + TDB_DATA outdata; + int32_t res; + int ret; + + ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0, + tdb_null, ctdb, &outdata, &res, &timeout, NULL); + if (ret != 0 || res != 0) { + DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n")); + return ret != 0 ? ret : res; + } + + if (outdata.dsize != sizeof(uint32_t)) { + DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n")); + talloc_free(outdata.dptr); + return -1; + } + + if (runstate != NULL) { + *runstate = *(uint32_t *)outdata.dptr; + } + talloc_free(outdata.dptr); + + return 0; +} + /* find the real path to a ltdb */ @@ -1760,6 +1873,40 @@ int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb, return 0; } +/* + * get db sequence number + */ +int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout, + uint32_t destnode, uint32_t dbid, uint64_t *seqnum) +{ + int ret; + int32_t res; + TDB_DATA data, outdata; + + data.dptr = (uint8_t *)&dbid; + data.dsize = sizeof(uint64_t); /* This is just wrong */ + + ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM, + 0, data, ctdb, &outdata, &res, &timeout, NULL); + if (ret != 0 || res != 0) { + DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n")); + return -1; + } + + if (outdata.dsize != sizeof(uint64_t)) { + DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n")); + talloc_free(outdata.dptr); + return -1; + } + + if (seqnum != NULL) { + *seqnum = *(uint64_t *)outdata.dptr; + } + talloc_free(outdata.dptr); + + return 0; +} + /* create a database */ @@ -1769,11 +1916,23 @@ int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32 int ret; int32_t res; TDB_DATA data; + uint64_t tdb_flags = 0; data.dptr = discard_const(name); data.dsize = strlen(name)+1; - ret = ctdb_control(ctdb, destnode, 0, + /* Make sure that volatile databases use jenkins hash */ + if (!persistent) { + tdb_flags = TDB_INCOMPATIBLE_HASH; + } + +#ifdef TDB_MUTEX_LOCKING + if (!persistent && ctdb->tunable.mutex_enabled == 1) { + tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST); + } +#endif + + ret = ctdb_control(ctdb, destnode, tdb_flags, persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 0, data, mem_ctx, &data, &res, &timeout, NULL); @@ -1896,6 +2055,9 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, TDB_DATA data; int ret; int32_t res; +#ifdef TDB_MUTEX_LOCKING + uint32_t mutex_enabled = 0; +#endif ctdb_db = ctdb_db_handle(ctdb, name); if (ctdb_db) { @@ -1912,6 +2074,30 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, data.dptr = discard_const(name); data.dsize = strlen(name)+1; + /* CTDB has switched to using jenkins hash for volatile databases. + * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH, + * always set it. + */ + if (!persistent) { + tdb_flags |= TDB_INCOMPATIBLE_HASH; + } + +#ifdef TDB_MUTEX_LOCKING + if (!persistent) { + ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0), + CTDB_CURRENT_NODE, + "TDBMutexEnabled", + &mutex_enabled); + if (ret != 0) { + DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n")); + } + + if (mutex_enabled == 1) { + tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST); + } + } +#endif + /* tell ctdb daemon to attach */ ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, @@ -1932,13 +2118,23 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, return NULL; } - tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC; + if (persistent) { + tdb_flags = TDB_DEFAULT; + } else { + tdb_flags = TDB_NOSYNC; +#ifdef TDB_MUTEX_LOCKING + if (mutex_enabled) { + tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST); + } +#endif + } if (ctdb->valgrinding) { tdb_flags |= TDB_NOMMAP; } tdb_flags |= TDB_DISALLOW_NESTING; - ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0); + ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags, + O_RDWR, 0); if (ctdb_db->ltdb == NULL) { ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path); talloc_free(ctdb_db); @@ -1957,6 +2153,25 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, return ctdb_db; } +/* + * detach from a specific database - client call + */ +int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id) +{ + int ret; + int32_t status; + TDB_DATA data; + + data.dsize = sizeof(db_id); + data.dptr = (uint8_t *)&db_id; + + ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH, + 0, data, NULL, NULL, &status, NULL, NULL); + if (ret != 0 || status != 0) { + return -1; + } + return 0; +} /* setup a call for a database @@ -2017,7 +2232,7 @@ static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA if (data.dsize < sizeof(uint32_t) || d->length != data.dsize) { DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize)); - state->done = True; + state->done = true; return; } @@ -2028,7 +2243,7 @@ static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA if (key.dsize == 0 && data.dsize == 0) { /* end of traverse */ - state->done = True; + state->done = true; return; } @@ -2040,7 +2255,7 @@ static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA } if (state->fn(ctdb, key, data, state->private_data) != 0) { - state->done = True; + state->done = true; } state->count++; @@ -2059,13 +2274,13 @@ static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db, void *private_data) { TDB_DATA data; - struct ctdb_traverse_start t; + struct ctdb_traverse_start_ext t; int32_t status; int ret; uint64_t srvid = (getpid() | 0xFLL<<60); struct traverse_state state; - state.done = False; + state.done = false; state.count = 0; state.private_data = private_data; state.fn = fn; @@ -2085,7 +2300,7 @@ static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db, data.dptr = (uint8_t *)&t; data.dsize = sizeof(t); - ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0, + ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0, data, NULL, NULL, &status, NULL, NULL); if (ret != 0 || status != 0) { DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n")); @@ -2146,15 +2361,21 @@ int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, v fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key)); } - fprintf(f, "flags: 0x%08x", h->flags); - if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA"); - if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED"); - if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC"); - if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS"); - if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY"); - if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY"); - if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE"); - fprintf(f, "\n"); + if (c->printhash) { + fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key)); + } + + if (c->printrecordflags) { + fprintf(f, "flags: 0x%08x", h->flags); + if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA"); + if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED"); + if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC"); + if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS"); + if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY"); + if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY"); + if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE"); + fprintf(f, "\n"); + } if (c->printdatasize) { fprintf(f, "data size: %u\n", (unsigned)data.dsize); @@ -2474,7 +2695,7 @@ int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, talloc_free(data.dptr); if (ret != 0 || res != 0) { DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n")); - return -1; + return ret != 0 ? ret : res; } if (outdata.dsize != sizeof(uint32_t)) { @@ -3176,7 +3397,7 @@ struct ctdb_context *ctdb_init(struct event_context *ev) ctdb->lastid = INT_MAX-200; CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr); - ret = ctdb_set_socketname(ctdb, CTDB_PATH); + ret = ctdb_set_socketname(ctdb, CTDB_SOCKET); if (ret != 0) { DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n")); talloc_free(ctdb); @@ -3287,9 +3508,12 @@ static void async_callback(struct ctdb_client_control_state *state) struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context); int ret; TDB_DATA outdata; - int32_t res; + int32_t res = -1; uint32_t destnode = state->c->hdr.destnode; + outdata.dsize = 0; + outdata.dptr = NULL; + /* one more node has responded with recmode data */ data->count--; @@ -3301,6 +3525,11 @@ static void async_callback(struct ctdb_client_control_state *state) DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode)); } data->fail_count++; + if (state->state == CTDB_CONTROL_TIMEOUT) { + res = -ETIME; + } else { + res = -1; + } if (data->fail_callback) { data->fail_callback(ctdb, destnode, res, outdata, data->callback_data); @@ -3439,19 +3668,23 @@ uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb, return nodes; } -uint32_t *list_of_active_nodes(struct ctdb_context *ctdb, - struct ctdb_node_map *node_map, - TALLOC_CTX *mem_ctx, - bool include_self) +/* Get list of nodes not including those with flags specified by mask. + * If exclude_pnn is not -1 then exclude that pnn from the list. + */ +uint32_t *list_of_nodes(struct ctdb_context *ctdb, + struct ctdb_node_map *node_map, + TALLOC_CTX *mem_ctx, + uint32_t mask, + int exclude_pnn) { int i, j, num_nodes; uint32_t *nodes; for (i=num_nodes=0;inum;i++) { - if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) { + if (node_map->nodes[i].flags & mask) { continue; } - if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) { + if (node_map->nodes[i].pnn == exclude_pnn) { continue; } num_nodes++; @@ -3461,10 +3694,10 @@ uint32_t *list_of_active_nodes(struct ctdb_context *ctdb, CTDB_NO_MEMORY_FATAL(ctdb, nodes); for (i=j=0;inum;i++) { - if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) { + if (node_map->nodes[i].flags & mask) { continue; } - if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) { + if (node_map->nodes[i].pnn == exclude_pnn) { continue; } nodes[j++] = node_map->nodes[i].pnn; @@ -3473,38 +3706,13 @@ uint32_t *list_of_active_nodes(struct ctdb_context *ctdb, return nodes; } -uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb, +uint32_t *list_of_active_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *node_map, TALLOC_CTX *mem_ctx, - uint32_t pnn) + bool include_self) { - int i, j, num_nodes; - uint32_t *nodes; - - for (i=num_nodes=0;inum;i++) { - if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) { - continue; - } - if (node_map->nodes[i].pnn == pnn) { - continue; - } - num_nodes++; - } - - nodes = talloc_array(mem_ctx, uint32_t, num_nodes); - CTDB_NO_MEMORY_FATAL(ctdb, nodes); - - for (i=j=0;inum;i++) { - if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) { - continue; - } - if (node_map->nodes[i].pnn == pnn) { - continue; - } - nodes[j++] = node_map->nodes[i].pnn; - } - - return nodes; + return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE, + include_self ? -1 : ctdb->pnn); } uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb, @@ -3512,33 +3720,8 @@ uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, bool include_self) { - int i, j, num_nodes; - uint32_t *nodes; - - for (i=num_nodes=0;inum;i++) { - if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) { - continue; - } - if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) { - continue; - } - num_nodes++; - } - - nodes = talloc_array(mem_ctx, uint32_t, num_nodes); - CTDB_NO_MEMORY_FATAL(ctdb, nodes); - - for (i=j=0;inum;i++) { - if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) { - continue; - } - if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) { - continue; - } - nodes[j++] = node_map->nodes[i].pnn; - } - - return nodes; + return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED, + include_self ? -1 : ctdb->pnn); } /* @@ -3617,189 +3800,366 @@ int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, return ret; } -/** - * check whether a transaction is active on a given db on a given node - */ -int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb, - uint32_t destnode, - uint32_t db_id) +struct server_id { + uint64_t pid; + uint32_t task_id; + uint32_t vnn; + uint64_t unique_id; +}; + +static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid) { - int32_t status; - int ret; - TDB_DATA indata; + struct server_id id; - indata.dptr = (uint8_t *)&db_id; - indata.dsize = sizeof(db_id); + id.pid = getpid(); + id.task_id = reqid; + id.vnn = ctdb_get_pnn(ctdb); + id.unique_id = id.vnn; + id.unique_id = (id.unique_id << 32) | reqid; - ret = ctdb_control(ctdb, destnode, 0, - CTDB_CONTROL_TRANS2_ACTIVE, - 0, indata, NULL, NULL, &status, - NULL, NULL); + return id; +} - if (ret != 0) { - DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n")); - return -1; +/* This is basically a copy from Samba's server_id.*. However, a + * dependency chain stops us from using Samba's version, so use a + * renamed copy until a better solution is found. */ +static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2) +{ + if (id1->pid != id2->pid) { + return false; } - return status; -} + if (id1->task_id != id2->task_id) { + return false; + } + if (id1->vnn != id2->vnn) { + return false; + } -struct ctdb_transaction_handle { - struct ctdb_db_context *ctdb_db; - bool in_replay; - /* - * we store the reads and writes done under a transaction: - * - one list stores both reads and writes (m_all), - * - the other just writes (m_write) - */ - struct ctdb_marshall_buffer *m_all; - struct ctdb_marshall_buffer *m_write; -}; + if (id1->unique_id != id2->unique_id) { + return false; + } -/* start a transaction on a database */ -static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h) -{ - tdb_transaction_cancel(h->ctdb_db->ltdb->tdb); - return 0; + return true; } -/* start a transaction on a database */ -static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h) +static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id) { - struct ctdb_record_handle *rh; - TDB_DATA key; - TDB_DATA data; - struct ctdb_ltdb_header header; - TALLOC_CTX *tmp_ctx; - const char *keyname = CTDB_TRANSACTION_LOCK_KEY; + struct ctdb_server_id sid; int ret; - struct ctdb_db_context *ctdb_db = h->ctdb_db; - pid_t pid; - int32_t status; + uint32_t result; - key.dptr = discard_const(keyname); - key.dsize = strlen(keyname); + sid.type = SERVER_TYPE_SAMBA; + sid.pnn = id->vnn; + sid.server_id = id->pid; - if (!ctdb_db->persistent) { - DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n")); - return -1; + ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0), + id->vnn, &sid, &result); + if (ret != 0) { + /* If control times out, assume server_id exists. */ + return true; + } + + if (result) { + return true; + } + + return false; +} + + +enum g_lock_type { + G_LOCK_READ = 0, + G_LOCK_WRITE = 1, +}; + +struct g_lock_rec { + enum g_lock_type type; + struct server_id id; +}; + +struct g_lock_recs { + unsigned int num; + struct g_lock_rec *lock; +}; + +static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data, + struct g_lock_recs **locks) +{ + struct g_lock_recs *recs; + + recs = talloc_zero(mem_ctx, struct g_lock_recs); + if (recs == NULL) { + return false; + } + + if (data.dsize == 0) { + goto done; } + if (data.dsize % sizeof(struct g_lock_rec) != 0) { + DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n", + (unsigned long)data.dsize)); + talloc_free(recs); + return false; + } + + recs->num = data.dsize / sizeof(struct g_lock_rec); + recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize); + if (recs->lock == NULL) { + talloc_free(recs); + return false; + } + +done: + if (locks != NULL) { + *locks = recs; + } + + return true; +} + + +static bool g_lock_lock(TALLOC_CTX *mem_ctx, + struct ctdb_db_context *ctdb_db, + const char *keyname, uint32_t reqid) +{ + TDB_DATA key, data; + struct ctdb_record_handle *h; + struct g_lock_recs *locks; + struct server_id id; + struct timeval t_start; + int i; + + key.dptr = (uint8_t *)discard_const(keyname); + key.dsize = strlen(keyname) + 1; + + t_start = timeval_current(); + again: - tmp_ctx = talloc_new(h); + /* Keep trying for an hour. */ + if (timeval_elapsed(&t_start) > 3600) { + return false; + } - rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL); - if (rh == NULL) { - DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n")); - talloc_free(tmp_ctx); - return -1; + h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data); + if (h == NULL) { + return false; } - status = ctdb_ctrl_transaction_active(ctdb_db->ctdb, - CTDB_CURRENT_NODE, - ctdb_db->db_id); - if (status == 1) { - unsigned long int usec = (1000 + random()) % 100000; - DEBUG(DEBUG_DEBUG, (__location__ " transaction is active " - "on db_id[0x%08x]. waiting for %lu " - "microseconds\n", - ctdb_db->db_id, usec)); - talloc_free(tmp_ctx); - usleep(usec); + if (!g_lock_parse(h, data, &locks)) { + DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n")); + talloc_free(data.dptr); + talloc_free(h); + return false; + } + + talloc_free(data.dptr); + + id = server_id_get(ctdb_db->ctdb, reqid); + + i = 0; + while (i < locks->num) { + if (ctdb_server_id_equal(&locks->lock[i].id, &id)) { + /* Internal error */ + talloc_free(h); + return false; + } + + if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) { + if (i < locks->num-1) { + locks->lock[i] = locks->lock[locks->num-1]; + } + locks->num--; + continue; + } + + /* This entry is locked. */ + DEBUG(DEBUG_INFO, ("g_lock: lock already granted for " + "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n", + (unsigned long long)id.pid, + id.task_id, id.vnn, + (unsigned long long)id.unique_id)); + talloc_free(h); goto again; } - /* - * store the pid in the database: - * it is not enough that the node is dmaster... - */ - pid = getpid(); - data.dptr = (unsigned char *)&pid; - data.dsize = sizeof(pid_t); - rh->header.rsn++; - rh->header.dmaster = ctdb_db->ctdb->pnn; - ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data); - if (ret != 0) { - DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in " - "transaction record\n")); - talloc_free(tmp_ctx); - return -1; + locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec, + locks->num+1); + if (locks->lock == NULL) { + talloc_free(h); + return false; } - talloc_free(rh); + locks->lock[locks->num].type = G_LOCK_WRITE; + locks->lock[locks->num].id = id; + locks->num++; - ret = tdb_transaction_start(ctdb_db->ltdb->tdb); - if (ret != 0) { - DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n")); - talloc_free(tmp_ctx); - return -1; + data.dptr = (uint8_t *)locks->lock; + data.dsize = locks->num * sizeof(struct g_lock_rec); + + if (ctdb_record_store(h, data) != 0) { + DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for " + "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n", + (unsigned long long)id.pid, + id.task_id, id.vnn, + (unsigned long long)id.unique_id)); + talloc_free(h); + return false; } - ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data); - if (ret != 0) { - DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction " - "lock record inside transaction\n")); - tdb_transaction_cancel(ctdb_db->ltdb->tdb); - talloc_free(tmp_ctx); - goto again; + DEBUG(DEBUG_INFO, ("g_lock: lock granted for " + "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n", + (unsigned long long)id.pid, + id.task_id, id.vnn, + (unsigned long long)id.unique_id)); + + talloc_free(h); + return true; +} + +static bool g_lock_unlock(TALLOC_CTX *mem_ctx, + struct ctdb_db_context *ctdb_db, + const char *keyname, uint32_t reqid) +{ + TDB_DATA key, data; + struct ctdb_record_handle *h; + struct g_lock_recs *locks; + struct server_id id; + int i; + bool found = false; + + key.dptr = (uint8_t *)discard_const(keyname); + key.dsize = strlen(keyname) + 1; + h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data); + if (h == NULL) { + return false; } - if (header.dmaster != ctdb_db->ctdb->pnn) { - DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on " - "transaction lock record\n")); - tdb_transaction_cancel(ctdb_db->ltdb->tdb); - talloc_free(tmp_ctx); - goto again; + if (!g_lock_parse(h, data, &locks)) { + DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n")); + talloc_free(data.dptr); + talloc_free(h); + return false; } - if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) { - DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in " - "the transaction lock record\n")); - tdb_transaction_cancel(ctdb_db->ltdb->tdb); - talloc_free(tmp_ctx); - goto again; + talloc_free(data.dptr); + + id = server_id_get(ctdb_db->ctdb, reqid); + + for (i=0; inum; i++) { + if (ctdb_server_id_equal(&locks->lock[i].id, &id)) { + if (i < locks->num-1) { + locks->lock[i] = locks->lock[locks->num-1]; + } + locks->num--; + found = true; + break; + } } - talloc_free(tmp_ctx); + if (!found) { + DEBUG(DEBUG_ERR, ("g_lock: lock not found\n")); + talloc_free(h); + return false; + } + + data.dptr = (uint8_t *)locks->lock; + data.dsize = locks->num * sizeof(struct g_lock_rec); + + if (ctdb_record_store(h, data) != 0) { + talloc_free(h); + return false; + } + + talloc_free(h); + return true; +} + +struct ctdb_transaction_handle { + struct ctdb_db_context *ctdb_db; + struct ctdb_db_context *g_lock_db; + char *lock_name; + uint32_t reqid; + /* + * we store reads and writes done under a transaction: + * - one list stores both reads and writes (m_all) + * - the other just writes (m_write) + */ + struct ctdb_marshall_buffer *m_all; + struct ctdb_marshall_buffer *m_write; +}; + +static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h) +{ + g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid); + ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid); return 0; } -/* start a transaction on a database */ +/** + * start a transaction on a database + */ struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx) { struct ctdb_transaction_handle *h; - int ret; + struct ctdb_server_id id; h = talloc_zero(mem_ctx, struct ctdb_transaction_handle); if (h == NULL) { - DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n")); + DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n")); return NULL; } h->ctdb_db = ctdb_db; + h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", + (unsigned int)ctdb_db->db_id); + if (h->lock_name == NULL) { + DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n")); + talloc_free(h); + return NULL; + } - ret = ctdb_transaction_fetch_start(h); - if (ret != 0) { + h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0), + "g_lock.tdb", false, 0); + if (!h->g_lock_db) { + DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n")); talloc_free(h); return NULL; } - talloc_set_destructor(h, ctdb_transaction_destructor); + id.type = SERVER_TYPE_SAMBA; + id.pnn = ctdb_get_pnn(ctdb_db->ctdb); + id.server_id = getpid(); - return h; -} + if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0), + &id) != 0) { + DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n")); + talloc_free(h); + return NULL; + } + h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h); + if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) { + DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n")); + talloc_free(h); + return NULL; + } -/* - fetch a record inside a transaction + talloc_set_destructor(h, ctdb_transaction_destructor); + return h; +} + +/** + * fetch a record inside a transaction */ -int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, - TALLOC_CTX *mem_ctx, +int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, + TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) { struct ctdb_ltdb_header header; @@ -3813,26 +4173,24 @@ int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, *data = tdb_null; ret = 0; } - + if (ret != 0) { return ret; } - if (!h->in_replay) { - h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data); - if (h->m_all == NULL) { - DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); - return -1; - } + h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data); + if (h->m_all == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + return -1; } return 0; } -/* - stores a record inside a transaction +/** + * stores a record inside a transaction */ -int ctdb_transaction_store(struct ctdb_transaction_handle *h, +int ctdb_transaction_store(struct ctdb_transaction_handle *h, TDB_DATA key, TDB_DATA data) { TALLOC_CTX *tmp_ctx = talloc_new(h); @@ -3840,8 +4198,6 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h, TDB_DATA olddata; int ret; - ZERO_STRUCT(header); - /* we need the header so we can update the RSN */ ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata); if (ret == -1 && header.dmaster == (uint32_t)-1) { @@ -3856,7 +4212,8 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h, } if (data.dsize == olddata.dsize && - memcmp(data.dptr, olddata.dptr, data.dsize) == 0) { + memcmp(data.dptr, olddata.dptr, data.dsize) == 0 && + header.rsn != 0) { /* save writing the same data */ talloc_free(tmp_ctx); return 0; @@ -3865,14 +4222,12 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h, header.dmaster = h->ctdb_db->ctdb->pnn; header.rsn++; - if (!h->in_replay) { - h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data); - if (h->m_all == NULL) { - DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); - talloc_free(tmp_ctx); - return -1; - } - } + h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data); + if (h->m_all == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + talloc_free(tmp_ctx); + return -1; + } h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data); if (h->m_write == NULL) { @@ -3880,187 +4235,142 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h, talloc_free(tmp_ctx); return -1; } - - ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data); talloc_free(tmp_ctx); - - return ret; + return 0; } -/* - replay a transaction - */ -static int ctdb_replay_transaction(struct ctdb_transaction_handle *h) +static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum) { - int ret, i; - struct ctdb_rec_data *rec = NULL; + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; + struct ctdb_ltdb_header header; + int ret; - h->in_replay = true; - talloc_free(h->m_write); - h->m_write = NULL; + key.dptr = (uint8_t *)discard_const(keyname); + key.dsize = strlen(keyname) + 1; - ret = ctdb_transaction_fetch_start(h); + ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data); if (ret != 0) { - return ret; + *seqnum = 0; + return 0; } - for (i=0;im_all->count;i++) { - TDB_DATA key, data; + if (data.dsize == 0) { + *seqnum = 0; + return 0; + } - rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data); - if (rec == NULL) { - DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n")); - goto failed; - } + if (data.dsize != sizeof(*seqnum)) { + DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n", + data.dsize)); + talloc_free(data.dptr); + return -1; + } - if (rec->reqid == 0) { - /* its a store */ - if (ctdb_transaction_store(h, key, data) != 0) { - goto failed; - } - } else { - TDB_DATA data2; - TALLOC_CTX *tmp_ctx = talloc_new(h); + *seqnum = *(uint64_t *)data.dptr; + talloc_free(data.dptr); - if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) { - talloc_free(tmp_ctx); - goto failed; - } - if (data2.dsize != data.dsize || - memcmp(data2.dptr, data.dptr, data.dsize) != 0) { - /* the record has changed on us - we have to give up */ - talloc_free(tmp_ctx); - goto failed; - } - talloc_free(tmp_ctx); - } - } - return 0; - -failed: - tdb_transaction_cancel(h->ctdb_db->ltdb->tdb); - return -1; } -/* - commit a transaction - */ -int ctdb_transaction_commit(struct ctdb_transaction_handle *h) +static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h, + uint64_t seqnum) { - int ret, retries=0; - int32_t status; - struct ctdb_context *ctdb = h->ctdb_db->ctdb; - struct timeval timeout; - enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR; + const char *keyname = CTDB_DB_SEQNUM_KEY; + TDB_DATA key, data; - talloc_set_destructor(h, NULL); + key.dptr = (uint8_t *)discard_const(keyname); + key.dsize = strlen(keyname) + 1; - /* our commit strategy is quite complex. + data.dptr = (uint8_t *)&seqnum; + data.dsize = sizeof(seqnum); - - we first try to commit the changes to all other nodes + return ctdb_transaction_store(h, key, data); +} - - if that works, then we commit locally and we are done - - if a commit on another node fails, then we need to cancel - the transaction, then restart the transaction (thus - opening a window of time for a pending recovery to - complete), then replay the transaction, checking all the - reads and writes (checking that reads give the same data, - and writes succeed). Then we retry the transaction to the - other nodes - */ +/** + * commit a transaction + */ +int ctdb_transaction_commit(struct ctdb_transaction_handle *h) +{ + int ret; + uint64_t old_seqnum, new_seqnum; + int32_t status; + struct timeval timeout; -again: if (h->m_write == NULL) { /* no changes were made */ - tdb_transaction_cancel(h->ctdb_db->ltdb->tdb); talloc_free(h); return 0; } - /* tell ctdbd to commit to the other nodes */ - timeout = timeval_current_ofs(1, 0); - ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, - retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, - ctdb_marshall_finish(h->m_write), NULL, NULL, &status, - &timeout, NULL); - if (ret != 0 || status != 0) { - tdb_transaction_cancel(h->ctdb_db->ltdb->tdb); - DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed" - ", retrying after 1 second...\n", - (retries==0)?"":"retry ")); - sleep(1); + ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n")); + ret = -1; + goto done; + } + new_seqnum = old_seqnum + 1; + ret = ctdb_store_db_seqnum(h, new_seqnum); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n")); + ret = -1; + goto done; + } + +again: + timeout = timeval_current_ofs(3,0); + ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, + h->ctdb_db->db_id, + CTDB_CONTROL_TRANS3_COMMIT, 0, + ctdb_marshall_finish(h->m_write), NULL, NULL, + &status, &timeout, NULL); + if (ret != 0 || status != 0) { + /* + * TRANS3_COMMIT control will only fail if recovery has been + * triggered. Check if the database has been updated or not. + */ + ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum); if (ret != 0) { - failure_control = CTDB_CONTROL_TRANS2_ERROR; - } else { - /* work out what error code we will give if we - have to fail the operation */ - switch ((enum ctdb_trans2_commit_error)status) { - case CTDB_TRANS2_COMMIT_SUCCESS: - case CTDB_TRANS2_COMMIT_SOMEFAIL: - case CTDB_TRANS2_COMMIT_TIMEOUT: - failure_control = CTDB_CONTROL_TRANS2_ERROR; - break; - case CTDB_TRANS2_COMMIT_ALLFAIL: - failure_control = CTDB_CONTROL_TRANS2_FINISHED; - break; - } + DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n")); + goto done; } - if (++retries == 100) { - DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", - h->ctdb_db->db_id, retries, (unsigned)failure_control)); - ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, - failure_control, CTDB_CTRL_FLAG_NOREPLY, - tdb_null, NULL, NULL, NULL, NULL, NULL); - talloc_free(h); - return -1; - } - - if (ctdb_replay_transaction(h) != 0) { - DEBUG(DEBUG_ERR, (__location__ " Failed to replay " - "transaction on db 0x%08x, " - "failure control =%u\n", - h->ctdb_db->db_id, - (unsigned)failure_control)); - ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, - failure_control, CTDB_CTRL_FLAG_NOREPLY, - tdb_null, NULL, NULL, NULL, NULL, NULL); - talloc_free(h); - return -1; + if (new_seqnum == old_seqnum) { + /* Database not yet updated, try again */ + goto again; } - goto again; - } else { - failure_control = CTDB_CONTROL_TRANS2_ERROR; - } - /* do the real commit locally */ - ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb); - if (ret != 0) { - DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction " - "on db id 0x%08x locally, " - "failure_control=%u\n", - h->ctdb_db->db_id, - (unsigned)failure_control)); - ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, - failure_control, CTDB_CTRL_FLAG_NOREPLY, - tdb_null, NULL, NULL, NULL, NULL, NULL); - talloc_free(h); - return ret; + if (new_seqnum != (old_seqnum + 1)) { + DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n", + (long long unsigned)new_seqnum, + (long long unsigned)old_seqnum)); + ret = -1; + goto done; + } } - /* tell ctdbd that we are finished with our local commit */ - ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, - CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, - tdb_null, NULL, NULL, NULL, NULL, NULL); + ret = 0; + +done: + talloc_free(h); + return ret; +} + +/** + * cancel a transaction + */ +int ctdb_transaction_cancel(struct ctdb_transaction_handle *h) +{ talloc_free(h); return 0; } + /* recovery daemon ping to main daemon */ @@ -4079,9 +4389,11 @@ int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb) return 0; } -/* when forking the main daemon and the child process needs to connect back - * to the daemon as a client process, this function can be used to change - * the ctdb context from daemon into client mode +/* When forking the main daemon and the child process needs to connect + * back to the daemon as a client process, this function can be used + * to change the ctdb context from daemon into client mode. The child + * process must be created using ctdb_fork() and not fork() - + * ctdb_fork() does some necessary housekeeping. */ int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...) { @@ -4093,32 +4405,20 @@ int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ... debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":"); va_end(ap); - /* shutdown the transport */ - if (ctdb->methods) { - ctdb->methods->shutdown(ctdb); - } - /* get a new event context */ - talloc_free(ctdb->ev); ctdb->ev = event_context_init(ctdb); tevent_loop_allow_nesting(ctdb->ev); - close(ctdb->daemon.sd); - ctdb->daemon.sd = -1; - - /* the client does not need to be realtime */ - if (ctdb->do_setsched) { - ctdb_restore_scheduler(ctdb); - } - - /* initialise ctdb */ + /* Connect to main CTDB daemon */ ret = ctdb_socket_connect(ctdb); if (ret != 0) { DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n")); return -1; } - return 0; + ctdb->can_send_controls = true; + + return 0; } /* @@ -4464,7 +4764,7 @@ int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, CTDB_CONTROL_GET_DB_PRIORITY, 0, data, tmp_ctx, NULL, &res, &timeout, NULL); if (ret != 0 || res < 0) { - DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n")); + DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n")); talloc_free(tmp_ctx); return -1; } @@ -4615,3 +4915,41 @@ int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid); return ctdb_ctrl_set_db_readonly_recv(ctdb, state); } + +/* + set a database to be sticky + */ +struct ctdb_client_control_state * +ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid) +{ + TDB_DATA data; + + data.dptr = (uint8_t *)&dbid; + data.dsize = sizeof(dbid); + + return ctdb_control_send(ctdb, destnode, 0, + CTDB_CONTROL_SET_DB_STICKY, 0, data, + ctdb, NULL, NULL); +} + +int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state) +{ + int ret; + int32_t res; + + ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL); + if (ret != 0 || res != 0) { + DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed ret:%d res:%d\n", ret, res)); + return -1; + } + + return 0; +} + +int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid) +{ + struct ctdb_client_control_state *state; + + state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid); + return ctdb_ctrl_set_db_sticky_recv(ctdb, state); +}