*/
#include "includes.h"
-#include "lib/tevent/tevent.h"
-#include "lib/tdb/include/tdb.h"
+#include "tdb.h"
#include "system/network.h"
#include "system/filesys.h"
#include "system/dir.h"
#include "system/time.h"
#include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
#include "db_wrap.h"
#include "lib/util/dlinklist.h"
#include <ctype.h>
#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
-/*
- this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
+/**
+ * write a record to a normal database
+ *
+ * This is the server-variant of the ctdb_ltdb_store function.
+ * It contains logic to determine whether a record should be
+ * stored or deleted. It also sends SCHEDULE_FOR_DELETION
+ * controls to the local ctdb daemon if apporpriate.
+ */
+static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key,
+ struct ctdb_ltdb_header *header,
+ TDB_DATA data)
{
- return 0;
-}
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ TDB_DATA rec;
+ int ret;
+ bool seqnum_suppressed = false;
+ bool keep = false;
+ bool schedule_for_deletion = false;
+ bool remove_from_delete_queue = false;
+ uint32_t lmaster;
+
+ if (ctdb->flags & CTDB_FLAG_TORTURE) {
+ struct ctdb_ltdb_header *h2;
+ rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
+ h2 = (struct ctdb_ltdb_header *)rec.dptr;
+ if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
+ DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
+ (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+ }
+ if (rec.dptr) free(rec.dptr);
+ }
-/*
- this is a plain fetch procedure that all databases support
-*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
-{
- call->reply_data = &call->record_data;
- return 0;
-}
+ if (ctdb->vnn_map == NULL) {
+ /*
+ * Called from a client: always store the record
+ * Also don't call ctdb_lmaster since it uses the vnn_map!
+ */
+ keep = true;
+ goto store;
+ }
+
+ lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
+
+ /*
+ * If we migrate an empty record off to another node
+ * and the record has not been migrated with data,
+ * delete the record instead of storing the empty record.
+ */
+ if (data.dsize != 0) {
+ keep = true;
+ } else if (header->flags & CTDB_REC_RO_FLAGS) {
+ keep = true;
+ } else if (ctdb_db->persistent) {
+ keep = true;
+ } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
+ /*
+ * The record is not created by the client but
+ * automatically by the ctdb_ltdb_fetch logic that
+ * creates a record with an initial header in the
+ * ltdb before trying to migrate the record from
+ * the current lmaster. Keep it instead of trying
+ * to delete the non-existing record...
+ */
+ keep = true;
+ schedule_for_deletion = true;
+ } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
+ keep = true;
+ } else if (ctdb_db->ctdb->pnn == lmaster) {
+ /*
+ * If we are lmaster, then we usually keep the record.
+ * But if we retrieve the dmaster role by a VACUUM_MIGRATE
+ * and the record is empty and has never been migrated
+ * with data, then we should delete it instead of storing it.
+ * This is part of the vacuuming process.
+ *
+ * The reason that we usually need to store even empty records
+ * on the lmaster is that a client operating directly on the
+ * lmaster (== dmaster) expects the local copy of the record to
+ * exist after successful ctdb migrate call. If the record does
+ * not exist, the client goes into a migrate loop and eventually
+ * fails. So storing the empty record makes sure that we do not
+ * need to change the client code.
+ */
+ if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
+ keep = true;
+ } else if (ctdb_db->ctdb->pnn != header->dmaster) {
+ keep = true;
+ }
+ } else if (ctdb_db->ctdb->pnn == header->dmaster) {
+ keep = true;
+ }
+
+ if (keep) {
+ if (!ctdb_db->persistent &&
+ (ctdb_db->ctdb->pnn == header->dmaster) &&
+ !(header->flags & CTDB_REC_RO_FLAGS))
+ {
+ header->rsn++;
+
+ if (data.dsize == 0) {
+ schedule_for_deletion = true;
+ }
+ }
+ remove_from_delete_queue = !schedule_for_deletion;
+ }
+store:
+ /*
+ * The VACUUM_MIGRATED flag is only set temporarily for
+ * the above logic when the record was retrieved by a
+ * VACUUM_MIGRATE call and should not be stored in the
+ * database.
+ *
+ * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
+ * and there are two cases in which the corresponding record
+ * is stored in the local database:
+ * 1. The record has been migrated with data in the past
+ * (the MIGRATED_WITH_DATA record flag is set).
+ * 2. The record has been filled with data again since it
+ * had been submitted in the VACUUM_FETCH message to the
+ * lmaster.
+ * For such records it is important to not store the
+ * VACUUM_MIGRATED flag in the database.
+ */
+ header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
+
+ /*
+ * Similarly, clear the AUTOMATIC flag which should not enter
+ * the local database copy since this would require client
+ * modifications to clear the flag when the client stores
+ * the record.
+ */
+ header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
+
+ rec.dsize = sizeof(*header) + data.dsize;
+ rec.dptr = talloc_size(ctdb, rec.dsize);
+ CTDB_NO_MEMORY(ctdb, rec.dptr);
+
+ memcpy(rec.dptr, header, sizeof(*header));
+ memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+ /* Databases with seqnum updates enabled only get their seqnum
+ changes when/if we modify the data */
+ if (ctdb_db->seqnum_update != NULL) {
+ TDB_DATA old;
+ old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+
+ if ( (old.dsize == rec.dsize)
+ && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
+ rec.dptr+sizeof(struct ctdb_ltdb_header),
+ rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+ tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
+ seqnum_suppressed = true;
+ }
+ if (old.dptr) free(old.dptr);
+ }
+
+ DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
+ ctdb_db->db_name,
+ keep?"storing":"deleting",
+ ctdb_hash(&key)));
+
+ if (keep) {
+ ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+ } else {
+ ret = tdb_delete(ctdb_db->ltdb->tdb, key);
+ }
+ if (ret != 0) {
+ int lvl = DEBUG_ERR;
+
+ if (keep == false &&
+ tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
+ {
+ lvl = DEBUG_DEBUG;
+ }
+
+ DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
+ "%d - %s\n",
+ ctdb_db->db_name,
+ keep?"store":"delete", ret,
+ tdb_errorstr(ctdb_db->ltdb->tdb)));
+
+ schedule_for_deletion = false;
+ remove_from_delete_queue = false;
+ }
+ if (seqnum_suppressed) {
+ tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
+ }
+
+ talloc_free(rec.dptr);
+
+ if (schedule_for_deletion) {
+ int ret2;
+ ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
+ if (ret2 != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
+ }
+ }
+
+ if (remove_from_delete_queue) {
+ ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
+ }
+
+ return ret;
+}
struct lock_fetch_state {
struct ctdb_context *ctdb;
/*
called when we should retry the operation
*/
-static void lock_fetch_callback(void *p)
+static void lock_fetch_callback(void *p, bool locked)
{
struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
if (!state->ignore_generation &&
1) tries to get the chainlock. If it succeeds, then it returns 0
2) if it fails to get a chainlock immediately then it sets up a
- non-blocking chainlock via ctdb_lockwait, and when it gets the
+ non-blocking chainlock via ctdb_lock_record, and when it gets the
chainlock it re-submits this ctdb request to the main packet
- receive function
+ receive function.
This effectively queues all ctdb requests that cannot be
immediately satisfied until it can get the lock. This means that
{
int ret;
struct tdb_context *tdb = ctdb_db->ltdb->tdb;
- struct lockwait_handle *h;
+ struct lock_request *lreq;
struct lock_fetch_state *state;
ret = tdb_chainlock_nonblock(tdb, key);
state->ignore_generation = ignore_generation;
/* now the contended path */
- h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
- if (h == NULL) {
+ lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
+ if (lreq == NULL) {
return -1;
}
/* we need to move the packet off the temporary context in ctdb_input_pkt(),
so it won't be freed yet */
talloc_steal(state, hdr);
- talloc_steal(state, h);
/* now tell the caller than we will retry asynchronously */
return -2;
return -1;
}
- if (may_recover && !ctdb->done_startup) {
+ if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
ctdb_db->db_name));
ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
return 0;
}
+
+int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+ char *ropath;
+
+ if (ctdb_db->readonly) {
+ return 0;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
+ return -1;
+ }
+
+ ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
+ if (ropath == NULL) {
+ DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
+ return -1;
+ }
+ ctdb_db->rottdb = tdb_open(ropath,
+ ctdb->tunable.database_hash_size,
+ TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
+ O_CREAT|O_RDWR, 0);
+ if (ctdb_db->rottdb == NULL) {
+ DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
+ talloc_free(ropath);
+ return -1;
+ }
+
+ DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
+
+ ctdb_db->readonly = true;
+
+ DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
+
+ talloc_free(ropath);
+ return 0;
+}
+
/*
attach to a database, handling both persistent and non-persistent databases
return 0 on success, -1 on failure
*/
static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
- bool persistent, const char *unhealthy_reason)
+ bool persistent, const char *unhealthy_reason,
+ bool jenkinshash, bool mutexes)
{
struct ctdb_db_context *ctdb_db, *tmp_db;
int ret;
ctdb_db->db_id = ctdb_hash(&key);
ctdb_db->persistent = persistent;
+ if (!ctdb_db->persistent) {
+ ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
+ if (ctdb_db->delete_queue == NULL) {
+ CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
+ }
+
+ ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
+ }
+
/* check for hash collisions */
for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
if (tmp_db->db_id == ctdb_db->db_id) {
if (ctdb->max_persistent_check_errors > 0) {
remaining_tries = 1;
}
- if (ctdb->done_startup) {
+ if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
remaining_tries = 0;
}
tdb_flags |= TDB_NOMMAP;
}
tdb_flags |= TDB_DISALLOW_NESTING;
+ if (jenkinshash) {
+ tdb_flags |= TDB_INCOMPATIBLE_HASH;
+ }
+#ifdef TDB_MUTEX_LOCKING
+ if (ctdb->tunable.mutex_enabled && mutexes &&
+ tdb_runtime_check_for_robust_mutexes()) {
+ tdb_flags |= TDB_MUTEX_LOCKING;
+ }
+#endif
again:
- ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
+ ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
ctdb->tunable.database_hash_size,
tdb_flags,
O_CREAT|O_RDWR, mode);
}
}
+ /* set up a rb tree we can use to track which records we have a
+ fetch-lock in-flight for so we can defer any additional calls
+ for the same record.
+ */
+ ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+ if (ctdb_db->deferred_fetch == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
DLIST_ADD(ctdb->db_list, ctdb_db);
/* setting this can help some high churn databases */
return -1;
}
+ /*
+ all databases support the "fetch_with_header" function. we need this
+ for efficient readonly record fetches
+ */
+ ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
+ if (ret != 0) {
+ DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
ret = ctdb_vacuum_init(ctdb_db);
if (ret != 0) {
DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
}
- DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
-
+ DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
+ ctdb_db->db_path, tdb_flags));
+
/* success */
return 0;
}
+struct ctdb_deferred_attach_context {
+ struct ctdb_deferred_attach_context *next, *prev;
+ struct ctdb_context *ctdb;
+ struct ctdb_req_control *c;
+};
+
+
+static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
+{
+ DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
+
+ return 0;
+}
+
+static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+{
+ struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
+ struct ctdb_context *ctdb = da_ctx->ctdb;
+
+ ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
+ talloc_free(da_ctx);
+}
+
+static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+{
+ struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
+ struct ctdb_context *ctdb = da_ctx->ctdb;
+
+ /* This talloc-steals the packet ->c */
+ ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
+ talloc_free(da_ctx);
+}
+
+int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
+{
+ struct ctdb_deferred_attach_context *da_ctx;
+
+ /* call it from the main event loop as soon as the current event
+ finishes.
+ */
+ while ((da_ctx = ctdb->deferred_attach) != NULL) {
+ DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
+ event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+ }
+
+ return 0;
+}
+
/*
a client has asked to attach a new database
*/
int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata, uint64_t tdb_flags,
- bool persistent)
+ bool persistent, uint32_t client_id,
+ struct ctdb_req_control *c,
+ bool *async_reply)
{
const char *db_name = (const char *)indata.dptr;
struct ctdb_db_context *db;
struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
+ struct ctdb_client *client = NULL;
+ bool with_jenkinshash, with_mutexes;
+
+ if (ctdb->tunable.allow_client_db_attach == 0) {
+ DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
+ "AllowClientDBAccess == 0\n", db_name));
+ return -1;
+ }
+
+ /* dont allow any local clients to attach while we are in recovery mode
+ * except for the recovery daemon.
+ * allow all attach from the network since these are always from remote
+ * recovery daemons.
+ */
+ if (client_id != 0) {
+ client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+ }
+ if (client != NULL) {
+ /* If the node is inactive it is not part of the cluster
+ and we should not allow clients to attach to any
+ databases
+ */
+ if (node->flags & NODE_FLAGS_INACTIVE) {
+ DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
+ return -1;
+ }
+
+ if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
+ client->pid != ctdb->recoverd_pid &&
+ ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
+ struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
+
+ if (da_ctx == NULL) {
+ DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
+ return -1;
+ }
+
+ da_ctx->ctdb = ctdb;
+ da_ctx->c = talloc_steal(da_ctx, c);
+ talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
+ DLIST_ADD(ctdb->deferred_attach, da_ctx);
+
+ event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
+
+ DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
+ *async_reply = true;
+ return 0;
+ }
+ }
/* the client can optionally pass additional tdb flags, but we
only allow a subset of those on the database in ctdb. Note
that tdb_flags is passed in via the (otherwise unused)
srvid to the attach control */
- tdb_flags &= TDB_NOSYNC;
-
- /* If the node is inactive it is not part of the cluster
- and we should not allow clients to attach to any
- databases
- */
- if (node->flags & NODE_FLAGS_INACTIVE) {
- DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
- return -1;
- }
-
+#ifdef TDB_MUTEX_LOCKING
+ tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING);
+#else
+ tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
+#endif
/* see if we already have this name */
db = ctdb_db_handle(ctdb, db_name);
if (db) {
+ if (db->persistent != persistent) {
+ DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
+ "database %s\n", persistent ? "" : "non-",
+ db-> persistent ? "" : "non-", db_name));
+ return -1;
+ }
outdata->dptr = (uint8_t *)&db->db_id;
outdata->dsize = sizeof(db->db_id);
tdb_add_flags(db->ltdb->tdb, tdb_flags);
return 0;
}
- if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
+ with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
+#ifdef TDB_MUTEX_LOCKING
+ with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
+#else
+ with_mutexes = false;
+#endif
+
+ if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
+ with_jenkinshash, with_mutexes) != 0) {
return -1;
}
outdata->dsize = sizeof(db->db_id);
/* Try to ensure it's locked in mem */
- ctdb_lockdown_memory(ctdb);
+ lockdown_memory(ctdb->valgrinding);
/* tell all the other nodes about this database */
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
CTDB_CONTROL_DB_ATTACH,
0, CTDB_CTRL_FLAG_NOREPLY,
return 0;
}
+/*
+ * a client has asked to detach from a database
+ */
+int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
+ uint32_t client_id)
+{
+ uint32_t db_id;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_client *client = NULL;
+
+ db_id = *(uint32_t *)indata.dptr;
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
+ db_id));
+ return -1;
+ }
+
+ if (ctdb->tunable.allow_client_db_attach == 1) {
+ DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
+ "Clients are allowed access to databases "
+ "(AllowClientDBAccess == 1)\n",
+ ctdb_db->db_name));
+ return -1;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
+ "denied\n", ctdb_db->db_name));
+ return -1;
+ }
+
+ /* Cannot detach from database when in recovery */
+ if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
+ DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
+ return -1;
+ }
+
+ /* If a control comes from a client, then broadcast it to all nodes.
+ * Do the actual detach only if the control comes from other daemons.
+ */
+ if (client_id != 0) {
+ client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+ if (client != NULL) {
+ /* forward the control to all the nodes */
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+ CTDB_CONTROL_DB_DETACH, 0,
+ CTDB_CTRL_FLAG_NOREPLY,
+ indata, NULL, NULL);
+ return 0;
+ }
+ DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
+ "for database '%s'\n", ctdb_db->db_name));
+ return -1;
+ }
+
+ /* Detach database from recoverd */
+ if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
+ CTDB_SRVID_DETACH_DATABASE,
+ indata) != 0) {
+ DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
+ return -1;
+ }
+
+ /* Disable vacuuming and drop all vacuuming data */
+ talloc_free(ctdb_db->vacuum_handle);
+ talloc_free(ctdb_db->delete_queue);
+
+ /* Terminate any deferred fetch */
+ talloc_free(ctdb_db->deferred_fetch);
+
+ /* Terminate any traverses */
+ while (ctdb_db->traverse) {
+ talloc_free(ctdb_db->traverse);
+ }
+
+ /* Terminate any revokes */
+ while (ctdb_db->revokechild_active) {
+ talloc_free(ctdb_db->revokechild_active);
+ }
+
+ /* Free readonly tracking database */
+ if (ctdb_db->readonly) {
+ talloc_free(ctdb_db->rottdb);
+ }
+
+ DLIST_REMOVE(ctdb->db_list, ctdb_db);
+
+ DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
+ ctdb_db->db_name));
+ talloc_free(ctdb_db);
+
+ return 0;
+}
/*
attach to all existing persistent databases
int invalid_name = 0;
s = talloc_strdup(ctdb, de->d_name);
- CTDB_NO_MEMORY(ctdb, s);
+ if (s == NULL) {
+ closedir(d);
+ CTDB_NO_MEMORY(ctdb, s);
+ }
/* only accept names ending in .tdb */
p = strstr(s, ".tdb.");
}
p[4] = 0;
- if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
+ if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
closedir(d);
talloc_free(s);
char *unhealthy_reason = NULL;
bool first_try = true;
- if (ctdb->db_directory == NULL) {
- ctdb->db_directory = VARDIR "/ctdb";
- }
- if (ctdb->db_directory_persistent == NULL) {
- ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
- }
- if (ctdb->db_directory_state == NULL) {
- ctdb->db_directory_state = VARDIR "/ctdb/state";
- }
-
- /* make sure the db directory exists */
- ret = mkdir(ctdb->db_directory, 0700);
- if (ret == -1 && errno != EEXIST) {
- DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
- ctdb->db_directory));
- return -1;
- }
-
- /* make sure the persistent db directory exists */
- ret = mkdir(ctdb->db_directory_persistent, 0700);
- if (ret == -1 && errno != EEXIST) {
- DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
- ctdb->db_directory_persistent));
- return -1;
- }
-
- /* make sure the internal state db directory exists */
- ret = mkdir(ctdb->db_directory_state, 0700);
- if (ret == -1 && errno != EEXIST) {
- DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
- ctdb->db_directory_state));
- return -1;
- }
-
persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
ctdb->db_directory_state,
PERSISTENT_HEALTH_TDB,
return 0;
}
-int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
+int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
+ uint32_t client_id)
{
struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
struct ctdb_db_context *ctdb_db;
ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
if (!ctdb_db) {
- DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
- return -1;
+ if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
+ DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
+ db_prio->db_id));
+ }
+ return 0;
}
if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
- return -1;
+ return 0;
}
ctdb_db->priority = db_prio->priority;
DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
+ if (client_id != 0) {
+ /* Broadcast the update to the rest of the cluster */
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+ CTDB_CONTROL_SET_DB_PRIORITY, 0,
+ CTDB_CTRL_FLAG_NOREPLY, indata,
+ NULL, NULL);
+ }
return 0;
}
+int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+ if (ctdb_db->sticky) {
+ return 0;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+ return -1;
+ }
+
+ ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
+
+ ctdb_db->sticky = true;
+
+ DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
+
+ return 0;
+}
+
+int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
+ uint32_t db_id,
+ TDB_DATA *outdata)
+{
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_db_statistics *stats;
+ int i;
+ int len;
+ char *ptr;
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
+ return -1;
+ }
+
+ len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
+ for (i = 0; i < MAX_HOT_KEYS; i++) {
+ len += ctdb_db->statistics.hot_keys[i].key.dsize;
+ }
+
+ stats = talloc_size(outdata, len);
+ if (stats == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
+ return -1;
+ }
+
+ *stats = ctdb_db->statistics;
+
+ stats->num_hot_keys = MAX_HOT_KEYS;
+
+ ptr = &stats->hot_keys_wire[0];
+ for (i = 0; i < MAX_HOT_KEYS; i++) {
+ memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
+ ctdb_db->statistics.hot_keys[i].key.dsize);
+ ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
+ }
+
+ outdata->dptr = (uint8_t *)stats;
+ outdata->dsize = len;
+
+ return 0;
+}