*/
#include "includes.h"
-#include "lib/tevent/tevent.h"
#include "lib/tdb/include/tdb.h"
#include "system/network.h"
#include "system/filesys.h"
#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
-/*
- this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
-{
- return 0;
-}
-
-/*
- this is a plain fetch procedure that all databases support
-*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
-{
- call->reply_data = &call->record_data;
- return 0;
-}
-
-
/**
* write a record to a normal database
*
* This is the server-variant of the ctdb_ltdb_store function.
* It contains logic to determine whether a record should be
- * stored or deleted.
+ * stored or deleted. It also sends SCHEDULE_FOR_DELETION
+ * controls to the local ctdb daemon if apporpriate.
*/
static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
TDB_DATA key,
int ret;
bool seqnum_suppressed = false;
bool keep = false;
+ bool schedule_for_deletion = false;
+ bool remove_from_delete_queue = false;
uint32_t lmaster;
if (ctdb->flags & CTDB_FLAG_TORTURE) {
*/
if (data.dsize != 0) {
keep = true;
+ } else if (header->flags & CTDB_REC_RO_FLAGS) {
+ keep = true;
} else if (ctdb_db->persistent) {
keep = true;
+ } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
+ /*
+ * The record is not created by the client but
+ * automatically by the ctdb_ltdb_fetch logic that
+ * creates a record with an initial header in the
+ * ltdb before trying to migrate the record from
+ * the current lmaster. Keep it instead of trying
+ * to delete the non-existing record...
+ */
+ keep = true;
+ schedule_for_deletion = true;
} else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
keep = true;
} else if (ctdb_db->ctdb->pnn == lmaster) {
keep = true;
}
+ if (keep) {
+ if (!ctdb_db->persistent &&
+ (ctdb_db->ctdb->pnn == header->dmaster) &&
+ !(header->flags & CTDB_REC_RO_FLAGS))
+ {
+ header->rsn++;
+
+ if (data.dsize == 0) {
+ schedule_for_deletion = true;
+ }
+ }
+ remove_from_delete_queue = !schedule_for_deletion;
+ }
+
store:
/*
* The VACUUM_MIGRATED flag is only set temporarily for
*/
header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
+ /*
+ * Similarly, clear the AUTOMATIC flag which should not enter
+ * the local database copy since this would require client
+ * modifications to clear the flag when the client stores
+ * the record.
+ */
+ header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
+
rec.dsize = sizeof(*header) + data.dsize;
rec.dptr = talloc_size(ctdb, rec.dsize);
CTDB_NO_MEMORY(ctdb, rec.dptr);
ctdb_db->db_name,
keep?"store":"delete", ret,
tdb_errorstr(ctdb_db->ltdb->tdb)));
+
+ schedule_for_deletion = false;
+ remove_from_delete_queue = false;
}
if (seqnum_suppressed) {
tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
talloc_free(rec.dptr);
+ if (schedule_for_deletion) {
+ int ret2;
+ ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
+ if (ret2 != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
+ }
+ }
+
+ if (remove_from_delete_queue) {
+ ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
+ }
+
return ret;
}
/*
called when we should retry the operation
*/
-static void lock_fetch_callback(void *p)
+static void lock_fetch_callback(void *p, bool locked)
{
struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
if (!state->ignore_generation &&
1) tries to get the chainlock. If it succeeds, then it returns 0
2) if it fails to get a chainlock immediately then it sets up a
- non-blocking chainlock via ctdb_lockwait, and when it gets the
+ non-blocking chainlock via ctdb_lock_record, and when it gets the
chainlock it re-submits this ctdb request to the main packet
- receive function
+ receive function.
This effectively queues all ctdb requests that cannot be
immediately satisfied until it can get the lock. This means that
{
int ret;
struct tdb_context *tdb = ctdb_db->ltdb->tdb;
- struct lockwait_handle *h;
+ struct lock_request *lreq;
struct lock_fetch_state *state;
ret = tdb_chainlock_nonblock(tdb, key);
state->ignore_generation = ignore_generation;
/* now the contended path */
- h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
- if (h == NULL) {
+ lreq = ctdb_lock_record(ctdb_db, key, true, lock_fetch_callback, state);
+ if (lreq == NULL) {
return -1;
}
/* we need to move the packet off the temporary context in ctdb_input_pkt(),
so it won't be freed yet */
talloc_steal(state, hdr);
- talloc_steal(state, h);
/* now tell the caller than we will retry asynchronously */
return -2;
return -1;
}
- if (may_recover && !ctdb->done_startup) {
+ if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
ctdb_db->db_name));
ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
return 0;
}
+
+int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+ char *ropath;
+
+ if (ctdb_db->readonly) {
+ return 0;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
+ return -1;
+ }
+
+ ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
+ if (ropath == NULL) {
+ DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
+ return -1;
+ }
+ ctdb_db->rottdb = tdb_open(ropath,
+ ctdb->tunable.database_hash_size,
+ TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
+ O_CREAT|O_RDWR, 0);
+ if (ctdb_db->rottdb == NULL) {
+ DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
+ talloc_free(ropath);
+ return -1;
+ }
+
+ DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
+
+ ctdb_db->readonly = true;
+ talloc_free(ropath);
+ return 0;
+}
+
/*
attach to a database, handling both persistent and non-persistent databases
return 0 on success, -1 on failure
if (ctdb->max_persistent_check_errors > 0) {
remaining_tries = 1;
}
- if (ctdb->done_startup) {
+ if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
remaining_tries = 0;
}
}
}
+ /* set up a rb tree we can use to track which records we have a
+ fetch-lock in-flight for so we can defer any additional calls
+ for the same record.
+ */
+ ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+ if (ctdb_db->deferred_fetch == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
DLIST_ADD(ctdb->db_list, ctdb_db);
/* setting this can help some high churn databases */
return -1;
}
+ /*
+ all databases support the "fetch_with_header" function. we need this
+ for efficient readonly record fetches
+ */
+ ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
+ if (ret != 0) {
+ DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
ret = ctdb_vacuum_init(ctdb_db);
if (ret != 0) {
DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
*/
while ((da_ctx = ctdb->deferred_attach) != NULL) {
DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
- event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+ event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
}
return 0;
struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
struct ctdb_client *client = NULL;
+ if (ctdb->tunable.allow_client_db_attach == 0) {
+ DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
+ "AllowClientDBAccess == 0\n", db_name));
+ return -1;
+ }
+
/* dont allow any local clients to attach while we are in recovery mode
* except for the recovery daemon.
* allow all attach from the network since these are always from remote
databases
*/
if (node->flags & NODE_FLAGS_INACTIVE) {
- DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
+ DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
return -1;
}
- if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
- && client->pid != ctdb->recoverd_pid) {
+ if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
+ client->pid != ctdb->recoverd_pid &&
+ ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
if (da_ctx == NULL) {
/* see if we already have this name */
db = ctdb_db_handle(ctdb, db_name);
if (db) {
+ if (db->persistent != persistent) {
+ DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
+ "database %s\n", persistent ? "" : "non-",
+ db-> persistent ? "" : "non-", db_name));
+ return -1;
+ }
outdata->dptr = (uint8_t *)&db->db_id;
outdata->dsize = sizeof(db->db_id);
tdb_add_flags(db->ltdb->tdb, tdb_flags);
return 0;
}
+
+int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+
+ DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
+
+ if (ctdb_db->sticky) {
+ return 0;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+ return -1;
+ }
+
+ ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
+
+ ctdb_db->sticky = true;
+
+ return 0;
+}
+
+int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
+ uint32_t db_id,
+ TDB_DATA *outdata)
+{
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_db_statistics_wire *stats;
+ int i;
+ int len;
+ char *ptr;
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
+ return -1;
+ }
+
+ len = offsetof(struct ctdb_db_statistics_wire, hot_keys);
+ for (i = 0; i < MAX_HOT_KEYS; i++) {
+ len += 8 + ctdb_db->statistics.hot_keys[i].key.dsize;
+ }
+
+ stats = talloc_size(outdata, len);
+ if (stats == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate db statistics wire structure\n"));
+ return -1;
+ }
+
+ stats->db_ro_delegations = ctdb_db->statistics.db_ro_delegations;
+ stats->db_ro_revokes = ctdb_db->statistics.db_ro_revokes;
+ for (i = 0; i < MAX_COUNT_BUCKETS; i++) {
+ stats->hop_count_bucket[i] = ctdb_db->statistics.hop_count_bucket[i];
+ }
+ stats->num_hot_keys = MAX_HOT_KEYS;
+
+ ptr = &stats->hot_keys[0];
+ for (i = 0; i < MAX_HOT_KEYS; i++) {
+ *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].count;
+ ptr += 4;
+
+ *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].key.dsize;
+ ptr += 4;
+
+ memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr, ctdb_db->statistics.hot_keys[i].key.dsize);
+ ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
+ }
+
+ outdata->dptr = (uint8_t *)stats;
+ outdata->dsize = len;
+
+ return 0;
+}