ctdb-locking: Talloc lock request from client specified context
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_ltdb_server.c
index 4f77934f0d83852b49a45e89943bf79107e66848..4b41542b58d2d27f5da31f79942f3a485e8f0eb4 100644 (file)
@@ -18,7 +18,7 @@
 */
 
 #include "includes.h"
-#include "lib/tdb/include/tdb.h"
+#include "tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/dir.h"
@@ -328,7 +328,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
        state->ignore_generation = ignore_generation;
 
        /* now the contended path */
-       lreq = ctdb_lock_record(ctdb_db, key, true, lock_fetch_callback, state);
+       lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
        if (lreq == NULL) {
                return -1;
        }
@@ -656,7 +656,7 @@ int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
                return -1;
        }
 
-       if (may_recover && !ctdb->done_startup) {
+       if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
                DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
                                  ctdb_db->db_name));
                ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
@@ -706,7 +706,7 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
        }
 
        if (ctdb_db->persistent) {
-               DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
+               DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
                return -1;
        }
 
@@ -728,6 +728,9 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
        DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
 
        ctdb_db->readonly = true;
+
+       DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
+
        talloc_free(ropath);
        return 0;
 }
@@ -738,7 +741,7 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
  */
 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                             bool persistent, const char *unhealthy_reason,
-                            bool jenkinshash)
+                            bool jenkinshash, bool mutexes)
 {
        struct ctdb_db_context *ctdb_db, *tmp_db;
        int ret;
@@ -794,7 +797,7 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                if (ctdb->max_persistent_check_errors > 0) {
                        remaining_tries = 1;
                }
-               if (ctdb->done_startup) {
+               if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
                        remaining_tries = 0;
                }
 
@@ -833,9 +836,15 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        if (jenkinshash) {
                tdb_flags |= TDB_INCOMPATIBLE_HASH;
        }
+#ifdef TDB_MUTEX_LOCKING
+       if (ctdb->tunable.mutex_enabled && mutexes &&
+           tdb_runtime_check_for_robust_mutexes()) {
+               tdb_flags |= TDB_MUTEX_LOCKING;
+       }
+#endif
 
 again:
-       ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
+       ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
                                      ctdb->tunable.database_hash_size, 
                                      tdb_flags, 
                                      O_CREAT|O_RDWR, mode);
@@ -993,8 +1002,9 @@ again:
        }
 
 
-       DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
-       
+       DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
+                           ctdb_db->db_path, tdb_flags));
+
        /* success */
        return 0;
 }
@@ -1061,6 +1071,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        struct ctdb_db_context *db;
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
        struct ctdb_client *client = NULL;
+       bool with_jenkinshash, with_mutexes;
 
        if (ctdb->tunable.allow_client_db_attach == 0) {
                DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
@@ -1082,13 +1093,13 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                   databases
                */
                if (node->flags & NODE_FLAGS_INACTIVE) {
-                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
                        return -1;
                }
 
-               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
-                && client->pid != ctdb->recoverd_pid
-                && !ctdb->done_startup) {
+               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
+                   client->pid != ctdb->recoverd_pid &&
+                   ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
                        struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
 
                        if (da_ctx == NULL) {
@@ -1113,7 +1124,11 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
           only allow a subset of those on the database in ctdb. Note
           that tdb_flags is passed in via the (otherwise unused)
           srvid to the attach control */
+#ifdef TDB_MUTEX_LOCKING
+       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING);
+#else
        tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
+#endif
 
        /* see if we already have this name */
        db = ctdb_db_handle(ctdb, db_name);
@@ -1130,7 +1145,15 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return 0;
        }
 
-       if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
+       with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
+#ifdef TDB_MUTEX_LOCKING
+       with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
+#else
+       with_mutexes = false;
+#endif
+
+       if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
+                             with_jenkinshash, with_mutexes) != 0) {
                return -1;
        }
 
@@ -1147,7 +1170,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        outdata->dsize = sizeof(db->db_id);
 
        /* Try to ensure it's locked in mem */
-       ctdb_lockdown_memory(ctdb);
+       lockdown_memory(ctdb->valgrinding);
 
        /* tell all the other nodes about this database */
        ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
@@ -1160,6 +1183,100 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        return 0;
 }
 
+/*
+ * a client has asked to detach from a database
+ */
+int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
+                              uint32_t client_id)
+{
+       uint32_t db_id;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_client *client = NULL;
+
+       db_id = *(uint32_t *)indata.dptr;
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
+                                 db_id));
+               return -1;
+       }
+
+       if (ctdb->tunable.allow_client_db_attach == 1) {
+               DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
+                                 "Clients are allowed access to databases "
+                                 "(AllowClientDBAccess == 1)\n",
+                                 ctdb_db->db_name));
+               return -1;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
+                                 "denied\n", ctdb_db->db_name));
+               return -1;
+       }
+
+       /* Cannot detach from database when in recovery */
+       if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
+               DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
+               return -1;
+       }
+
+       /* If a control comes from a client, then broadcast it to all nodes.
+        * Do the actual detach only if the control comes from other daemons.
+        */
+       if (client_id != 0) {
+               client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+               if (client != NULL) {
+                       /* forward the control to all the nodes */
+                       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+                                                CTDB_CONTROL_DB_DETACH, 0,
+                                                CTDB_CTRL_FLAG_NOREPLY,
+                                                indata, NULL, NULL);
+                       return 0;
+               }
+               DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
+                                 "for database '%s'\n", ctdb_db->db_name));
+               return -1;
+       }
+
+       /* Detach database from recoverd */
+       if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
+                                    CTDB_SRVID_DETACH_DATABASE,
+                                    indata) != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
+               return -1;
+       }
+
+       /* Disable vacuuming and drop all vacuuming data */
+       talloc_free(ctdb_db->vacuum_handle);
+       talloc_free(ctdb_db->delete_queue);
+
+       /* Terminate any deferred fetch */
+       talloc_free(ctdb_db->deferred_fetch);
+
+       /* Terminate any traverses */
+       while (ctdb_db->traverse) {
+               talloc_free(ctdb_db->traverse);
+       }
+
+       /* Terminate any revokes */
+       while (ctdb_db->revokechild_active) {
+               talloc_free(ctdb_db->revokechild_active);
+       }
+
+       /* Free readonly tracking database */
+       if (ctdb_db->readonly) {
+               talloc_free(ctdb_db->rottdb);
+       }
+
+       DLIST_REMOVE(ctdb->db_list, ctdb_db);
+
+       DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
+                            ctdb_db->db_name));
+       talloc_free(ctdb_db);
+
+       return 0;
+}
 
 /*
   attach to all existing persistent databases
@@ -1183,7 +1300,10 @@ static int ctdb_attach_persistent(struct ctdb_context *ctdb,
                int invalid_name = 0;
                
                s = talloc_strdup(ctdb, de->d_name);
-               CTDB_NO_MEMORY(ctdb, s);
+               if (s == NULL) {
+                       closedir(d);
+                       CTDB_NO_MEMORY(ctdb, s);
+               }
 
                /* only accept names ending in .tdb */
                p = strstr(s, ".tdb.");
@@ -1206,7 +1326,7 @@ static int ctdb_attach_persistent(struct ctdb_context *ctdb,
                }
                p[4] = 0;
 
-               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
+               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
                        closedir(d);
                        talloc_free(s);
@@ -1228,40 +1348,6 @@ int ctdb_attach_databases(struct ctdb_context *ctdb)
        char *unhealthy_reason = NULL;
        bool first_try = true;
 
-       if (ctdb->db_directory == NULL) {
-               ctdb->db_directory = VARDIR "/ctdb";
-       }
-       if (ctdb->db_directory_persistent == NULL) {
-               ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
-       }
-       if (ctdb->db_directory_state == NULL) {
-               ctdb->db_directory_state = VARDIR "/ctdb/state";
-       }
-
-       /* make sure the db directory exists */
-       ret = mkdir(ctdb->db_directory, 0700);
-       if (ret == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
-                        ctdb->db_directory));
-               return -1;
-       }
-
-       /* make sure the persistent db directory exists */
-       ret = mkdir(ctdb->db_directory_persistent, 0700);
-       if (ret == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
-                        ctdb->db_directory_persistent));
-               return -1;
-       }
-
-       /* make sure the internal state db directory exists */
-       ret = mkdir(ctdb->db_directory_state, 0700);
-       if (ret == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
-                        ctdb->db_directory_state));
-               return -1;
-       }
-
        persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
                                                 ctdb->db_directory_state,
                                                 PERSISTENT_HEALTH_TDB,
@@ -1452,14 +1538,18 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
        return 0;
 }
 
-int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
+int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
+                                    uint32_t client_id)
 {
        struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
        struct ctdb_db_context *ctdb_db;
 
        ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
        if (!ctdb_db) {
-               DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
+               if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
+                       DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
+                                        db_prio->db_id));
+               }
                return 0;
        }
 
@@ -1471,15 +1561,19 @@ int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
        ctdb_db->priority = db_prio->priority;
        DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
 
+       if (client_id != 0) {
+               /* Broadcast the update to the rest of the cluster */
+               ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+                                        CTDB_CONTROL_SET_DB_PRIORITY, 0,
+                                        CTDB_CTRL_FLAG_NOREPLY, indata,
+                                        NULL, NULL);
+       }
        return 0;
 }
 
 
 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
 {
-
-       DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
-
        if (ctdb_db->sticky) {
                return 0;
        }
@@ -1493,6 +1587,8 @@ int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_d
 
        ctdb_db->sticky = true;
 
+       DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
+
        return 0;
 }
 
@@ -1501,7 +1597,7 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
                                TDB_DATA *outdata)
 {
        struct ctdb_db_context *ctdb_db;
-       struct ctdb_db_statistics_wire *stats;
+       struct ctdb_db_statistics *stats;
        int i;
        int len;
        char *ptr;
@@ -1512,33 +1608,25 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
                return -1;
        }
 
-       len = offsetof(struct ctdb_db_statistics_wire, hot_keys);
+       len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
        for (i = 0; i < MAX_HOT_KEYS; i++) {
-               len += 8 + ctdb_db->statistics.hot_keys[i].key.dsize;
+               len += ctdb_db->statistics.hot_keys[i].key.dsize;
        }
 
        stats = talloc_size(outdata, len);
        if (stats == NULL) {
-               DEBUG(DEBUG_ERR,("Failed to allocate db statistics wire structure\n"));
+               DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
                return -1;
        }
 
-       stats->db_ro_delegations = ctdb_db->statistics.db_ro_delegations;
-       stats->db_ro_revokes     = ctdb_db->statistics.db_ro_revokes;
-       for (i = 0; i < MAX_COUNT_BUCKETS; i++) {
-               stats->hop_count_bucket[i] = ctdb_db->statistics.hop_count_bucket[i];
-       }
+       *stats = ctdb_db->statistics;
+
        stats->num_hot_keys = MAX_HOT_KEYS;
 
-       ptr = &stats->hot_keys[0];
+       ptr = &stats->hot_keys_wire[0];
        for (i = 0; i < MAX_HOT_KEYS; i++) {
-               *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].count;
-               ptr += 4;
-
-               *(uint32_t *)ptr = ctdb_db->statistics.hot_keys[i].key.dsize;
-               ptr += 4;
-
-               memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr, ctdb_db->statistics.hot_keys[i].key.dsize);
+               memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
+                      ctdb_db->statistics.hot_keys[i].key.dsize);
                ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
        }