2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/network.h"
23 #include "system/filesys.h"
24 #include "system/dir.h"
25 #include "system/time.h"
26 #include "../include/ctdb_private.h"
27 #include "../common/rb_tree.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/dlinklist.h"
31 #include "common/reqid.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 * write a record to a normal database
38 * This is the server-variant of the ctdb_ltdb_store function.
39 * It contains logic to determine whether a record should be
40 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41 * controls to the local ctdb daemon if apporpriate.
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
45 struct ctdb_ltdb_header *header,
48 struct ctdb_context *ctdb = ctdb_db->ctdb;
51 bool seqnum_suppressed = false;
53 bool schedule_for_deletion = false;
54 bool remove_from_delete_queue = false;
57 if (ctdb->flags & CTDB_FLAG_TORTURE) {
58 struct ctdb_ltdb_header *h2;
59 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
60 h2 = (struct ctdb_ltdb_header *)rec.dptr;
61 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
62 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
63 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
65 if (rec.dptr) free(rec.dptr);
68 if (ctdb->vnn_map == NULL) {
70 * Called from a client: always store the record
71 * Also don't call ctdb_lmaster since it uses the vnn_map!
77 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
80 * If we migrate an empty record off to another node
81 * and the record has not been migrated with data,
82 * delete the record instead of storing the empty record.
84 if (data.dsize != 0) {
86 } else if (header->flags & CTDB_REC_RO_FLAGS) {
88 } else if (ctdb_db->persistent) {
90 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
92 * The record is not created by the client but
93 * automatically by the ctdb_ltdb_fetch logic that
94 * creates a record with an initial header in the
95 * ltdb before trying to migrate the record from
96 * the current lmaster. Keep it instead of trying
97 * to delete the non-existing record...
100 schedule_for_deletion = true;
101 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
103 } else if (ctdb_db->ctdb->pnn == lmaster) {
105 * If we are lmaster, then we usually keep the record.
106 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
107 * and the record is empty and has never been migrated
108 * with data, then we should delete it instead of storing it.
109 * This is part of the vacuuming process.
111 * The reason that we usually need to store even empty records
112 * on the lmaster is that a client operating directly on the
113 * lmaster (== dmaster) expects the local copy of the record to
114 * exist after successful ctdb migrate call. If the record does
115 * not exist, the client goes into a migrate loop and eventually
116 * fails. So storing the empty record makes sure that we do not
117 * need to change the client code.
119 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
121 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
124 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
129 if (!ctdb_db->persistent &&
130 (ctdb_db->ctdb->pnn == header->dmaster) &&
131 !(header->flags & CTDB_REC_RO_FLAGS))
135 if (data.dsize == 0) {
136 schedule_for_deletion = true;
139 remove_from_delete_queue = !schedule_for_deletion;
144 * The VACUUM_MIGRATED flag is only set temporarily for
145 * the above logic when the record was retrieved by a
146 * VACUUM_MIGRATE call and should not be stored in the
149 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
150 * and there are two cases in which the corresponding record
151 * is stored in the local database:
152 * 1. The record has been migrated with data in the past
153 * (the MIGRATED_WITH_DATA record flag is set).
154 * 2. The record has been filled with data again since it
155 * had been submitted in the VACUUM_FETCH message to the
157 * For such records it is important to not store the
158 * VACUUM_MIGRATED flag in the database.
160 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
163 * Similarly, clear the AUTOMATIC flag which should not enter
164 * the local database copy since this would require client
165 * modifications to clear the flag when the client stores
168 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
170 rec.dsize = sizeof(*header) + data.dsize;
171 rec.dptr = talloc_size(ctdb, rec.dsize);
172 CTDB_NO_MEMORY(ctdb, rec.dptr);
174 memcpy(rec.dptr, header, sizeof(*header));
175 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
177 /* Databases with seqnum updates enabled only get their seqnum
178 changes when/if we modify the data */
179 if (ctdb_db->seqnum_update != NULL) {
181 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
183 if ( (old.dsize == rec.dsize)
184 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
185 rec.dptr+sizeof(struct ctdb_ltdb_header),
186 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
187 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
188 seqnum_suppressed = true;
190 if (old.dptr) free(old.dptr);
193 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
195 keep?"storing":"deleting",
199 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
201 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
208 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
213 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
216 keep?"store":"delete", ret,
217 tdb_errorstr(ctdb_db->ltdb->tdb)));
219 schedule_for_deletion = false;
220 remove_from_delete_queue = false;
222 if (seqnum_suppressed) {
223 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
226 talloc_free(rec.dptr);
228 if (schedule_for_deletion) {
230 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
232 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
236 if (remove_from_delete_queue) {
237 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
243 struct lock_fetch_state {
244 struct ctdb_context *ctdb;
245 struct ctdb_db_context *ctdb_db;
246 void (*recv_pkt)(void *, struct ctdb_req_header *);
248 struct ctdb_req_header *hdr;
250 bool ignore_generation;
254 called when we should retry the operation
256 static void lock_fetch_callback(void *p, bool locked)
258 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
259 if (!state->ignore_generation &&
260 state->generation != state->ctdb_db->generation) {
261 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
262 talloc_free(state->hdr);
265 state->recv_pkt(state->recv_context, state->hdr);
266 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
271 do a non-blocking ltdb_lock, deferring this ctdb request until we
274 It does the following:
276 1) tries to get the chainlock. If it succeeds, then it returns 0
278 2) if it fails to get a chainlock immediately then it sets up a
279 non-blocking chainlock via ctdb_lock_record, and when it gets the
280 chainlock it re-submits this ctdb request to the main packet
283 This effectively queues all ctdb requests that cannot be
284 immediately satisfied until it can get the lock. This means that
285 the main ctdb daemon will not block waiting for a chainlock held by
288 There are 3 possible return values:
290 0: means that it got the lock immediately.
291 -1: means that it failed to get the lock, and won't retry
292 -2: means that it failed to get the lock immediately, but will retry
294 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
295 TDB_DATA key, struct ctdb_req_header *hdr,
296 void (*recv_pkt)(void *, struct ctdb_req_header *),
297 void *recv_context, bool ignore_generation)
300 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
301 struct lock_request *lreq;
302 struct lock_fetch_state *state;
304 ret = tdb_chainlock_nonblock(tdb, key);
307 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
308 /* a hard failure - don't try again */
312 /* when torturing, ensure we test the contended path */
313 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
316 tdb_chainunlock(tdb, key);
319 /* first the non-contended path */
324 state = talloc(hdr, struct lock_fetch_state);
325 state->ctdb = ctdb_db->ctdb;
326 state->ctdb_db = ctdb_db;
328 state->recv_pkt = recv_pkt;
329 state->recv_context = recv_context;
330 state->generation = ctdb_db->generation;
331 state->ignore_generation = ignore_generation;
333 /* now the contended path */
334 lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
339 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
340 so it won't be freed yet */
341 talloc_steal(state, hdr);
343 /* now tell the caller than we will retry asynchronously */
348 a varient of ctdb_ltdb_lock_requeue that also fetches the record
350 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
351 TDB_DATA key, struct ctdb_ltdb_header *header,
352 struct ctdb_req_header *hdr, TDB_DATA *data,
353 void (*recv_pkt)(void *, struct ctdb_req_header *),
354 void *recv_context, bool ignore_generation)
358 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
359 recv_context, ignore_generation);
361 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
364 uret = ctdb_ltdb_unlock(ctdb_db, key);
366 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
375 paraoid check to see if the db is empty
377 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
379 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
380 int count = tdb_traverse_read(tdb, NULL, NULL);
382 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
384 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
388 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
389 struct ctdb_db_context *ctdb_db)
391 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
397 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
398 key.dsize = strlen(ctdb_db->db_name);
400 old = ctdb_db->unhealthy_reason;
401 ctdb_db->unhealthy_reason = NULL;
403 val = tdb_fetch(tdb, key);
405 reason = talloc_strndup(ctdb_db,
406 (const char *)val.dptr,
408 if (reason == NULL) {
409 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
411 ctdb_db->unhealthy_reason = old;
422 ctdb_db->unhealthy_reason = reason;
426 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
427 struct ctdb_db_context *ctdb_db,
428 const char *given_reason,/* NULL means healthy */
429 int num_healthy_nodes)
431 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
435 char *new_reason = NULL;
436 char *old_reason = NULL;
438 ret = tdb_transaction_start(tdb);
440 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
441 tdb_name(tdb), ret, tdb_errorstr(tdb)));
445 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
447 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
448 ctdb_db->db_name, ret));
451 old_reason = ctdb_db->unhealthy_reason;
453 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
454 key.dsize = strlen(ctdb_db->db_name);
457 new_reason = talloc_strdup(ctdb_db, given_reason);
458 if (new_reason == NULL) {
459 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
463 } else if (old_reason && num_healthy_nodes == 0) {
465 * If the reason indicates ok, but there where no healthy nodes
466 * available, that it means, we have not recovered valid content
467 * of the db. So if there's an old reason, prefix it with
468 * "NO-HEALTHY-NODES - "
472 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
473 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
475 prefix = _TMP_PREFIX;
479 new_reason = talloc_asprintf(ctdb_db, "%s%s",
481 if (new_reason == NULL) {
482 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
483 prefix, old_reason));
490 val.dptr = discard_const_p(uint8_t, new_reason);
491 val.dsize = strlen(new_reason);
493 ret = tdb_store(tdb, key, val, TDB_REPLACE);
495 tdb_transaction_cancel(tdb);
496 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
497 tdb_name(tdb), ctdb_db->db_name, new_reason,
498 ret, tdb_errorstr(tdb)));
499 talloc_free(new_reason);
502 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
503 ctdb_db->db_name, new_reason));
504 } else if (old_reason) {
505 ret = tdb_delete(tdb, key);
507 tdb_transaction_cancel(tdb);
508 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
509 tdb_name(tdb), ctdb_db->db_name,
510 ret, tdb_errorstr(tdb)));
511 talloc_free(new_reason);
514 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
518 ret = tdb_transaction_commit(tdb);
519 if (ret != TDB_SUCCESS) {
520 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
521 tdb_name(tdb), ret, tdb_errorstr(tdb)));
522 talloc_free(new_reason);
526 talloc_free(old_reason);
527 ctdb_db->unhealthy_reason = new_reason;
532 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
533 struct ctdb_db_context *ctdb_db)
535 time_t now = time(NULL);
543 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
544 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
545 "%04u%02u%02u%02u%02u%02u.0Z",
547 tm->tm_year+1900, tm->tm_mon+1,
548 tm->tm_mday, tm->tm_hour, tm->tm_min,
550 if (new_path == NULL) {
551 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
555 new_reason = talloc_asprintf(ctdb_db,
556 "ERROR - Backup of corrupted TDB in '%s'",
558 if (new_reason == NULL) {
559 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
562 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
563 talloc_free(new_reason);
565 DEBUG(DEBUG_CRIT,(__location__
566 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
571 ret = rename(ctdb_db->db_path, new_path);
573 DEBUG(DEBUG_CRIT,(__location__
574 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
575 ctdb_db->db_path, new_path,
576 errno, strerror(errno)));
577 talloc_free(new_path);
581 DEBUG(DEBUG_CRIT,(__location__
582 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
583 ctdb_db->db_path, new_path));
584 talloc_free(new_path);
588 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
590 struct ctdb_db_context *ctdb_db;
595 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
596 if (!ctdb_db->persistent) {
600 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
602 DEBUG(DEBUG_ALERT,(__location__
603 " load persistent health for '%s' failed\n",
608 if (ctdb_db->unhealthy_reason == NULL) {
610 DEBUG(DEBUG_INFO,(__location__
611 " persistent db '%s' healthy\n",
617 DEBUG(DEBUG_ALERT,(__location__
618 " persistent db '%s' unhealthy: %s\n",
620 ctdb_db->unhealthy_reason));
622 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
623 ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
635 mark a database - as healthy
637 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
639 uint32_t db_id = *(uint32_t *)indata.dptr;
640 struct ctdb_db_context *ctdb_db;
642 bool may_recover = false;
644 ctdb_db = find_ctdb_db(ctdb, db_id);
646 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
650 if (ctdb_db->unhealthy_reason) {
654 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
656 DEBUG(DEBUG_ERR,(__location__
657 " ctdb_update_persistent_health(%s) failed\n",
662 if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
663 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
665 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
671 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
675 uint32_t db_id = *(uint32_t *)indata.dptr;
676 struct ctdb_db_context *ctdb_db;
679 ctdb_db = find_ctdb_db(ctdb, db_id);
681 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
685 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
687 DEBUG(DEBUG_ERR,(__location__
688 " ctdb_load_persistent_health(%s) failed\n",
694 if (ctdb_db->unhealthy_reason) {
695 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
696 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
703 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
707 if (ctdb_db->readonly) {
711 if (ctdb_db->persistent) {
712 DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
716 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
717 if (ropath == NULL) {
718 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
721 ctdb_db->rottdb = tdb_open(ropath,
722 ctdb->tunable.database_hash_size,
723 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
725 if (ctdb_db->rottdb == NULL) {
726 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
731 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
733 ctdb_db->readonly = true;
735 DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
742 attach to a database, handling both persistent and non-persistent databases
743 return 0 on success, -1 on failure
745 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
746 bool persistent, const char *unhealthy_reason,
747 bool jenkinshash, bool mutexes)
749 struct ctdb_db_context *ctdb_db, *tmp_db;
754 int remaining_tries = 0;
756 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
757 CTDB_NO_MEMORY(ctdb, ctdb_db);
759 ctdb_db->priority = 1;
760 ctdb_db->ctdb = ctdb;
761 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
762 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
764 key.dsize = strlen(db_name)+1;
765 key.dptr = discard_const(db_name);
766 ctdb_db->db_id = ctdb_hash(&key);
767 ctdb_db->persistent = persistent;
769 if (!ctdb_db->persistent) {
770 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
771 if (ctdb_db->delete_queue == NULL) {
772 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
775 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
778 /* check for hash collisions */
779 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
780 if (tmp_db->db_id == ctdb_db->db_id) {
781 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
782 tmp_db->db_id, db_name, tmp_db->db_name));
783 talloc_free(ctdb_db);
789 if (unhealthy_reason) {
790 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
791 unhealthy_reason, 0);
793 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
794 ctdb_db->db_name, unhealthy_reason, ret));
795 talloc_free(ctdb_db);
800 if (ctdb->max_persistent_check_errors > 0) {
803 if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
807 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
809 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
810 ctdb_db->db_name, ret));
811 talloc_free(ctdb_db);
816 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
817 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
818 ctdb_db->db_name, ctdb_db->unhealthy_reason));
819 talloc_free(ctdb_db);
823 if (ctdb_db->unhealthy_reason) {
824 /* this is just a warning, but we want that in the log file! */
825 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
826 ctdb_db->db_name, ctdb_db->unhealthy_reason));
829 /* open the database */
830 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
831 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
834 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
835 if (ctdb->valgrinding) {
836 tdb_flags |= TDB_NOMMAP;
838 tdb_flags |= TDB_DISALLOW_NESTING;
840 tdb_flags |= TDB_INCOMPATIBLE_HASH;
842 #ifdef TDB_MUTEX_LOCKING
843 if (ctdb->tunable.mutex_enabled && mutexes &&
844 tdb_runtime_check_for_robust_mutexes()) {
845 tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
850 ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
851 ctdb->tunable.database_hash_size,
853 O_CREAT|O_RDWR, mode);
854 if (ctdb_db->ltdb == NULL) {
856 int saved_errno = errno;
859 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
862 strerror(saved_errno)));
863 talloc_free(ctdb_db);
867 if (remaining_tries == 0) {
868 DEBUG(DEBUG_CRIT,(__location__
869 "Failed to open persistent tdb '%s': %d - %s\n",
872 strerror(saved_errno)));
873 talloc_free(ctdb_db);
877 ret = stat(ctdb_db->db_path, &st);
879 DEBUG(DEBUG_CRIT,(__location__
880 "Failed to open persistent tdb '%s': %d - %s\n",
883 strerror(saved_errno)));
884 talloc_free(ctdb_db);
888 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
890 DEBUG(DEBUG_CRIT,(__location__
891 "Failed to open persistent tdb '%s': %d - %s\n",
894 strerror(saved_errno)));
895 talloc_free(ctdb_db);
905 ctdb_check_db_empty(ctdb_db);
907 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
912 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
913 ctdb_db->db_path, ret,
914 tdb_errorstr(ctdb_db->ltdb->tdb)));
915 if (remaining_tries == 0) {
916 talloc_free(ctdb_db);
920 fd = tdb_fd(ctdb_db->ltdb->tdb);
921 ret = fstat(fd, &st);
923 DEBUG(DEBUG_CRIT,(__location__
924 "Failed to fstat() persistent tdb '%s': %d - %s\n",
928 talloc_free(ctdb_db);
933 talloc_free(ctdb_db->ltdb);
934 ctdb_db->ltdb = NULL;
936 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
938 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
940 talloc_free(ctdb_db);
950 /* set up a rb tree we can use to track which records we have a
951 fetch-lock in-flight for so we can defer any additional calls
954 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
955 if (ctdb_db->deferred_fetch == NULL) {
956 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
957 talloc_free(ctdb_db);
961 ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
962 if (ctdb_db->defer_dmaster == NULL) {
963 DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
965 talloc_free(ctdb_db);
969 DLIST_ADD(ctdb->db_list, ctdb_db);
971 /* setting this can help some high churn databases */
972 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
975 all databases support the "null" function. we need this in
976 order to do forced migration of records
978 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
980 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
981 talloc_free(ctdb_db);
986 all databases support the "fetch" function. we need this
987 for efficient Samba3 ctdb fetch
989 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
991 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
992 talloc_free(ctdb_db);
997 all databases support the "fetch_with_header" function. we need this
998 for efficient readonly record fetches
1000 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1002 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1003 talloc_free(ctdb_db);
1007 ret = ctdb_vacuum_init(ctdb_db);
1009 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1010 "database '%s'\n", ctdb_db->db_name));
1011 talloc_free(ctdb_db);
1015 ctdb_db->generation = ctdb->vnn_map->generation;
1017 DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
1018 ctdb_db->db_path, tdb_flags));
1025 struct ctdb_deferred_attach_context {
1026 struct ctdb_deferred_attach_context *next, *prev;
1027 struct ctdb_context *ctdb;
1028 struct ctdb_req_control *c;
1032 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1034 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1039 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1041 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1042 struct ctdb_context *ctdb = da_ctx->ctdb;
1044 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1045 talloc_free(da_ctx);
1048 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1050 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1051 struct ctdb_context *ctdb = da_ctx->ctdb;
1053 /* This talloc-steals the packet ->c */
1054 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1055 talloc_free(da_ctx);
1058 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1060 struct ctdb_deferred_attach_context *da_ctx;
1062 /* call it from the main event loop as soon as the current event
1065 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1066 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1067 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1074 a client has asked to attach a new database
1076 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1077 TDB_DATA *outdata, uint64_t tdb_flags,
1078 bool persistent, uint32_t client_id,
1079 struct ctdb_req_control *c,
1082 const char *db_name = (const char *)indata.dptr;
1083 struct ctdb_db_context *db;
1084 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1085 struct ctdb_client *client = NULL;
1086 bool with_jenkinshash, with_mutexes;
1088 if (ctdb->tunable.allow_client_db_attach == 0) {
1089 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1090 "AllowClientDBAccess == 0\n", db_name));
1094 /* dont allow any local clients to attach while we are in recovery mode
1095 * except for the recovery daemon.
1096 * allow all attach from the network since these are always from remote
1099 if (client_id != 0) {
1100 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1102 if (client != NULL) {
1103 /* If the node is inactive it is not part of the cluster
1104 and we should not allow clients to attach to any
1107 if (node->flags & NODE_FLAGS_INACTIVE) {
1108 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
1112 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
1113 client->pid != ctdb->recoverd_pid &&
1114 ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
1115 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1117 if (da_ctx == NULL) {
1118 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1122 da_ctx->ctdb = ctdb;
1123 da_ctx->c = talloc_steal(da_ctx, c);
1124 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1125 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1127 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1129 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1130 *async_reply = true;
1135 /* the client can optionally pass additional tdb flags, but we
1136 only allow a subset of those on the database in ctdb. Note
1137 that tdb_flags is passed in via the (otherwise unused)
1138 srvid to the attach control */
1139 #ifdef TDB_MUTEX_LOCKING
1140 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
1142 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1145 /* see if we already have this name */
1146 db = ctdb_db_handle(ctdb, db_name);
1148 if (db->persistent != persistent) {
1149 DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
1150 "database %s\n", persistent ? "" : "non-",
1151 db-> persistent ? "" : "non-", db_name));
1154 outdata->dptr = (uint8_t *)&db->db_id;
1155 outdata->dsize = sizeof(db->db_id);
1156 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1160 with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
1161 #ifdef TDB_MUTEX_LOCKING
1162 with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
1164 with_mutexes = false;
1167 if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
1168 with_jenkinshash, with_mutexes) != 0) {
1172 db = ctdb_db_handle(ctdb, db_name);
1174 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1178 /* remember the flags the client has specified */
1179 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1181 outdata->dptr = (uint8_t *)&db->db_id;
1182 outdata->dsize = sizeof(db->db_id);
1184 /* Try to ensure it's locked in mem */
1185 lockdown_memory(ctdb->valgrinding);
1187 /* tell all the other nodes about this database */
1188 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1189 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1190 CTDB_CONTROL_DB_ATTACH,
1191 0, CTDB_CTRL_FLAG_NOREPLY,
1192 indata, NULL, NULL);
1199 * a client has asked to detach from a database
1201 int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
1205 struct ctdb_db_context *ctdb_db;
1206 struct ctdb_client *client = NULL;
1208 db_id = *(uint32_t *)indata.dptr;
1209 ctdb_db = find_ctdb_db(ctdb, db_id);
1210 if (ctdb_db == NULL) {
1211 DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
1216 if (ctdb->tunable.allow_client_db_attach == 1) {
1217 DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
1218 "Clients are allowed access to databases "
1219 "(AllowClientDBAccess == 1)\n",
1224 if (ctdb_db->persistent) {
1225 DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
1226 "denied\n", ctdb_db->db_name));
1230 /* Cannot detach from database when in recovery */
1231 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
1232 DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
1236 /* If a control comes from a client, then broadcast it to all nodes.
1237 * Do the actual detach only if the control comes from other daemons.
1239 if (client_id != 0) {
1240 client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1241 if (client != NULL) {
1242 /* forward the control to all the nodes */
1243 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1244 CTDB_CONTROL_DB_DETACH, 0,
1245 CTDB_CTRL_FLAG_NOREPLY,
1246 indata, NULL, NULL);
1249 DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
1250 "for database '%s'\n", ctdb_db->db_name));
1254 /* Detach database from recoverd */
1255 if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
1256 CTDB_SRVID_DETACH_DATABASE,
1258 DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
1262 /* Disable vacuuming and drop all vacuuming data */
1263 talloc_free(ctdb_db->vacuum_handle);
1264 talloc_free(ctdb_db->delete_queue);
1266 /* Terminate any deferred fetch */
1267 talloc_free(ctdb_db->deferred_fetch);
1269 /* Terminate any traverses */
1270 while (ctdb_db->traverse) {
1271 talloc_free(ctdb_db->traverse);
1274 /* Terminate any revokes */
1275 while (ctdb_db->revokechild_active) {
1276 talloc_free(ctdb_db->revokechild_active);
1279 /* Free readonly tracking database */
1280 if (ctdb_db->readonly) {
1281 talloc_free(ctdb_db->rottdb);
1284 DLIST_REMOVE(ctdb->db_list, ctdb_db);
1286 DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1288 talloc_free(ctdb_db);
1294 attach to all existing persistent databases
1296 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1297 const char *unhealthy_reason)
1302 /* open the persistent db directory and scan it for files */
1303 d = opendir(ctdb->db_directory_persistent);
1308 while ((de=readdir(d))) {
1310 size_t len = strlen(de->d_name);
1312 int invalid_name = 0;
1314 s = talloc_strdup(ctdb, de->d_name);
1317 CTDB_NO_MEMORY(ctdb, s);
1320 /* only accept names ending in .tdb */
1321 p = strstr(s, ".tdb.");
1322 if (len < 7 || p == NULL) {
1327 /* only accept names ending with .tdb. and any number of digits */
1329 while (*q != 0 && invalid_name == 0) {
1330 if (!isdigit(*q++)) {
1334 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1335 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1341 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
1342 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1348 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1356 int ctdb_attach_databases(struct ctdb_context *ctdb)
1359 char *persistent_health_path = NULL;
1360 char *unhealthy_reason = NULL;
1361 bool first_try = true;
1363 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1364 ctdb->db_directory_state,
1365 PERSISTENT_HEALTH_TDB,
1367 if (persistent_health_path == NULL) {
1368 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1374 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1375 0, TDB_DISALLOW_NESTING,
1376 O_CREAT | O_RDWR, 0600);
1377 if (ctdb->db_persistent_health == NULL) {
1378 struct tdb_wrap *tdb;
1381 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1382 persistent_health_path,
1385 talloc_free(persistent_health_path);
1386 talloc_free(unhealthy_reason);
1391 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1392 persistent_health_path,
1393 "was cleared after a failure",
1394 "manual verification needed");
1395 if (unhealthy_reason == NULL) {
1396 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1397 talloc_free(persistent_health_path);
1401 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1402 persistent_health_path));
1403 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1404 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1405 O_CREAT | O_RDWR, 0600);
1407 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1408 persistent_health_path,
1411 talloc_free(persistent_health_path);
1412 talloc_free(unhealthy_reason);
1419 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1421 struct tdb_wrap *tdb;
1423 talloc_free(ctdb->db_persistent_health);
1424 ctdb->db_persistent_health = NULL;
1427 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1428 persistent_health_path));
1429 talloc_free(persistent_health_path);
1430 talloc_free(unhealthy_reason);
1435 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1436 persistent_health_path,
1437 "was cleared after a failure",
1438 "manual verification needed");
1439 if (unhealthy_reason == NULL) {
1440 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1441 talloc_free(persistent_health_path);
1445 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1446 persistent_health_path));
1447 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1448 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1449 O_CREAT | O_RDWR, 0600);
1451 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1452 persistent_health_path,
1455 talloc_free(persistent_health_path);
1456 talloc_free(unhealthy_reason);
1463 talloc_free(persistent_health_path);
1465 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1466 talloc_free(unhealthy_reason);
1475 called when a broadcast seqnum update comes in
1477 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1479 struct ctdb_db_context *ctdb_db;
1480 if (srcnode == ctdb->pnn) {
1481 /* don't update ourselves! */
1485 ctdb_db = find_ctdb_db(ctdb, db_id);
1487 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1491 if (ctdb_db->unhealthy_reason) {
1492 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1493 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1497 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1498 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1503 timer to check for seqnum changes in a ltdb and propogate them
1505 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1506 struct timeval t, void *p)
1508 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1509 struct ctdb_context *ctdb = ctdb_db->ctdb;
1510 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1511 if (new_seqnum != ctdb_db->seqnum) {
1512 /* something has changed - propogate it */
1514 data.dptr = (uint8_t *)&ctdb_db->db_id;
1515 data.dsize = sizeof(uint32_t);
1516 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1517 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1520 ctdb_db->seqnum = new_seqnum;
1522 /* setup a new timer */
1523 ctdb_db->seqnum_update =
1524 event_add_timed(ctdb->ev, ctdb_db,
1525 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1526 ctdb_ltdb_seqnum_check, ctdb_db);
1530 enable seqnum handling on this db
1532 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1534 struct ctdb_db_context *ctdb_db;
1535 ctdb_db = find_ctdb_db(ctdb, db_id);
1537 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1541 if (ctdb_db->seqnum_update == NULL) {
1542 ctdb_db->seqnum_update =
1543 event_add_timed(ctdb->ev, ctdb_db,
1544 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1545 ctdb_ltdb_seqnum_check, ctdb_db);
1548 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1549 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1553 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
1556 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1557 struct ctdb_db_context *ctdb_db;
1559 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1561 if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
1562 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
1568 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1569 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1573 ctdb_db->priority = db_prio->priority;
1574 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1576 if (client_id != 0) {
1577 /* Broadcast the update to the rest of the cluster */
1578 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
1579 CTDB_CONTROL_SET_DB_PRIORITY, 0,
1580 CTDB_CTRL_FLAG_NOREPLY, indata,
1587 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
1589 if (ctdb_db->sticky) {
1593 if (ctdb_db->persistent) {
1594 DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
1598 ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
1600 ctdb_db->sticky = true;
1602 DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
1607 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
1609 struct ctdb_db_statistics *s = &ctdb_db->statistics;
1612 for (i=0; i<MAX_HOT_KEYS; i++) {
1613 if (s->hot_keys[i].key.dsize > 0) {
1614 talloc_free(s->hot_keys[i].key.dptr);
1618 ZERO_STRUCT(ctdb_db->statistics);
1621 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1625 struct ctdb_db_context *ctdb_db;
1626 struct ctdb_db_statistics *stats;
1631 ctdb_db = find_ctdb_db(ctdb, db_id);
1633 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1637 len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
1638 for (i = 0; i < MAX_HOT_KEYS; i++) {
1639 len += ctdb_db->statistics.hot_keys[i].key.dsize;
1642 stats = talloc_size(outdata, len);
1643 if (stats == NULL) {
1644 DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
1648 memcpy(stats, &ctdb_db->statistics,
1649 offsetof(struct ctdb_db_statistics, hot_keys_wire));
1651 stats->num_hot_keys = MAX_HOT_KEYS;
1653 ptr = &stats->hot_keys_wire[0];
1654 for (i = 0; i < MAX_HOT_KEYS; i++) {
1655 memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
1656 ctdb_db->statistics.hot_keys[i].key.dsize);
1657 ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
1660 outdata->dptr = (uint8_t *)stats;
1661 outdata->dsize = len;