2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 this is the dummy null procedure that all databases support
38 static int ctdb_null_func(struct ctdb_call_info *call)
44 this is a plain fetch procedure that all databases support
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
48 call->reply_data = &call->record_data;
54 * write a record to a normal database
56 * This is the server-variant of the ctdb_ltdb_store function.
57 * It contains logic to determine whether a record should be
60 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
62 struct ctdb_ltdb_header *header,
65 struct ctdb_context *ctdb = ctdb_db->ctdb;
68 bool seqnum_suppressed = false;
72 if (ctdb->flags & CTDB_FLAG_TORTURE) {
73 struct ctdb_ltdb_header *h2;
74 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
75 h2 = (struct ctdb_ltdb_header *)rec.dptr;
76 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
77 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
78 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
80 if (rec.dptr) free(rec.dptr);
83 if (ctdb->vnn_map == NULL) {
85 * Called from a client: always store the record
86 * Also don't call ctdb_lmaster since it uses the vnn_map!
92 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
95 * If we migrate an empty record off to another node
96 * and the record has not been migrated with data,
97 * delete the record instead of storing the empty record.
99 if (data.dsize != 0) {
101 } else if (ctdb_db->persistent) {
103 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
105 } else if (ctdb_db->ctdb->pnn == lmaster) {
107 * If we are lmaster, then we usually keep the record.
108 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
109 * and the record is empty and has never been migrated
110 * with data, then we should delete it instead of storing it.
111 * This is part of the vacuuming process.
113 * The reason that we usually need to store even empty records
114 * on the lmaster is that a client operating directly on the
115 * lmaster (== dmaster) expects the local copy of the record to
116 * exist after successful ctdb migrate call. If the record does
117 * not exist, the client goes into a migrate loop and eventually
118 * fails. So storing the empty record makes sure that we do not
119 * need to change the client code.
121 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
123 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
126 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
132 * The VACUUM_MIGRATED flag is only set temporarily for
133 * the above logic when the record was retrieved by a
134 * VACUUM_MIGRATE call and should not be stored in the
137 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
138 * and there are two cases in which the corresponding record
139 * is stored in the local database:
140 * 1. The record has been migrated with data in the past
141 * (the MIGRATED_WITH_DATA record flag is set).
142 * 2. The record has been filled with data again since it
143 * had been submitted in the VACUUM_FETCH message to the
145 * For such records it is important to not store the
146 * VACUUM_MIGRATED flag in the database.
148 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
150 rec.dsize = sizeof(*header) + data.dsize;
151 rec.dptr = talloc_size(ctdb, rec.dsize);
152 CTDB_NO_MEMORY(ctdb, rec.dptr);
154 memcpy(rec.dptr, header, sizeof(*header));
155 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
157 /* Databases with seqnum updates enabled only get their seqnum
158 changes when/if we modify the data */
159 if (ctdb_db->seqnum_update != NULL) {
161 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
163 if ( (old.dsize == rec.dsize)
164 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
165 rec.dptr+sizeof(struct ctdb_ltdb_header),
166 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
167 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
168 seqnum_suppressed = true;
170 if (old.dptr) free(old.dptr);
173 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
175 keep?"storing":"deleting",
179 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
181 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
185 DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
187 if (seqnum_suppressed) {
188 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
191 talloc_free(rec.dptr);
196 struct lock_fetch_state {
197 struct ctdb_context *ctdb;
198 void (*recv_pkt)(void *, struct ctdb_req_header *);
200 struct ctdb_req_header *hdr;
202 bool ignore_generation;
206 called when we should retry the operation
208 static void lock_fetch_callback(void *p)
210 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
211 if (!state->ignore_generation &&
212 state->generation != state->ctdb->vnn_map->generation) {
213 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
214 talloc_free(state->hdr);
217 state->recv_pkt(state->recv_context, state->hdr);
218 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
223 do a non-blocking ltdb_lock, deferring this ctdb request until we
226 It does the following:
228 1) tries to get the chainlock. If it succeeds, then it returns 0
230 2) if it fails to get a chainlock immediately then it sets up a
231 non-blocking chainlock via ctdb_lockwait, and when it gets the
232 chainlock it re-submits this ctdb request to the main packet
235 This effectively queues all ctdb requests that cannot be
236 immediately satisfied until it can get the lock. This means that
237 the main ctdb daemon will not block waiting for a chainlock held by
240 There are 3 possible return values:
242 0: means that it got the lock immediately.
243 -1: means that it failed to get the lock, and won't retry
244 -2: means that it failed to get the lock immediately, but will retry
246 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
247 TDB_DATA key, struct ctdb_req_header *hdr,
248 void (*recv_pkt)(void *, struct ctdb_req_header *),
249 void *recv_context, bool ignore_generation)
252 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
253 struct lockwait_handle *h;
254 struct lock_fetch_state *state;
256 ret = tdb_chainlock_nonblock(tdb, key);
259 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
260 /* a hard failure - don't try again */
264 /* when torturing, ensure we test the contended path */
265 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
268 tdb_chainunlock(tdb, key);
271 /* first the non-contended path */
276 state = talloc(hdr, struct lock_fetch_state);
277 state->ctdb = ctdb_db->ctdb;
279 state->recv_pkt = recv_pkt;
280 state->recv_context = recv_context;
281 state->generation = ctdb_db->ctdb->vnn_map->generation;
282 state->ignore_generation = ignore_generation;
284 /* now the contended path */
285 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
290 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
291 so it won't be freed yet */
292 talloc_steal(state, hdr);
293 talloc_steal(state, h);
295 /* now tell the caller than we will retry asynchronously */
300 a varient of ctdb_ltdb_lock_requeue that also fetches the record
302 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
303 TDB_DATA key, struct ctdb_ltdb_header *header,
304 struct ctdb_req_header *hdr, TDB_DATA *data,
305 void (*recv_pkt)(void *, struct ctdb_req_header *),
306 void *recv_context, bool ignore_generation)
310 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
311 recv_context, ignore_generation);
313 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
316 uret = ctdb_ltdb_unlock(ctdb_db, key);
318 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
327 paraoid check to see if the db is empty
329 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
331 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
332 int count = tdb_traverse_read(tdb, NULL, NULL);
334 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
336 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
340 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
341 struct ctdb_db_context *ctdb_db)
343 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
349 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
350 key.dsize = strlen(ctdb_db->db_name);
352 old = ctdb_db->unhealthy_reason;
353 ctdb_db->unhealthy_reason = NULL;
355 val = tdb_fetch(tdb, key);
357 reason = talloc_strndup(ctdb_db,
358 (const char *)val.dptr,
360 if (reason == NULL) {
361 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
363 ctdb_db->unhealthy_reason = old;
374 ctdb_db->unhealthy_reason = reason;
378 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
379 struct ctdb_db_context *ctdb_db,
380 const char *given_reason,/* NULL means healthy */
381 int num_healthy_nodes)
383 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
387 char *new_reason = NULL;
388 char *old_reason = NULL;
390 ret = tdb_transaction_start(tdb);
392 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
393 tdb_name(tdb), ret, tdb_errorstr(tdb)));
397 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
399 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
400 ctdb_db->db_name, ret));
403 old_reason = ctdb_db->unhealthy_reason;
405 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
406 key.dsize = strlen(ctdb_db->db_name);
409 new_reason = talloc_strdup(ctdb_db, given_reason);
410 if (new_reason == NULL) {
411 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
415 } else if (old_reason && num_healthy_nodes == 0) {
417 * If the reason indicates ok, but there where no healthy nodes
418 * available, that it means, we have not recovered valid content
419 * of the db. So if there's an old reason, prefix it with
420 * "NO-HEALTHY-NODES - "
424 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
425 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
427 prefix = _TMP_PREFIX;
431 new_reason = talloc_asprintf(ctdb_db, "%s%s",
433 if (new_reason == NULL) {
434 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
435 prefix, old_reason));
442 val.dptr = discard_const_p(uint8_t, new_reason);
443 val.dsize = strlen(new_reason);
445 ret = tdb_store(tdb, key, val, TDB_REPLACE);
447 tdb_transaction_cancel(tdb);
448 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
449 tdb_name(tdb), ctdb_db->db_name, new_reason,
450 ret, tdb_errorstr(tdb)));
451 talloc_free(new_reason);
454 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
455 ctdb_db->db_name, new_reason));
456 } else if (old_reason) {
457 ret = tdb_delete(tdb, key);
459 tdb_transaction_cancel(tdb);
460 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
461 tdb_name(tdb), ctdb_db->db_name,
462 ret, tdb_errorstr(tdb)));
463 talloc_free(new_reason);
466 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
470 ret = tdb_transaction_commit(tdb);
471 if (ret != TDB_SUCCESS) {
472 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
473 tdb_name(tdb), ret, tdb_errorstr(tdb)));
474 talloc_free(new_reason);
478 talloc_free(old_reason);
479 ctdb_db->unhealthy_reason = new_reason;
484 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
485 struct ctdb_db_context *ctdb_db)
487 time_t now = time(NULL);
495 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
496 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
497 "%04u%02u%02u%02u%02u%02u.0Z",
499 tm->tm_year+1900, tm->tm_mon+1,
500 tm->tm_mday, tm->tm_hour, tm->tm_min,
502 if (new_path == NULL) {
503 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
507 new_reason = talloc_asprintf(ctdb_db,
508 "ERROR - Backup of corrupted TDB in '%s'",
510 if (new_reason == NULL) {
511 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
514 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
515 talloc_free(new_reason);
517 DEBUG(DEBUG_CRIT,(__location__
518 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
523 ret = rename(ctdb_db->db_path, new_path);
525 DEBUG(DEBUG_CRIT,(__location__
526 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
527 ctdb_db->db_path, new_path,
528 errno, strerror(errno)));
529 talloc_free(new_path);
533 DEBUG(DEBUG_CRIT,(__location__
534 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
535 ctdb_db->db_path, new_path));
536 talloc_free(new_path);
540 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
542 struct ctdb_db_context *ctdb_db;
547 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
548 if (!ctdb_db->persistent) {
552 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
554 DEBUG(DEBUG_ALERT,(__location__
555 " load persistent health for '%s' failed\n",
560 if (ctdb_db->unhealthy_reason == NULL) {
562 DEBUG(DEBUG_INFO,(__location__
563 " persistent db '%s' healthy\n",
569 DEBUG(DEBUG_ALERT,(__location__
570 " persistent db '%s' unhealthy: %s\n",
572 ctdb_db->unhealthy_reason));
574 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
575 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
587 mark a database - as healthy
589 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
591 uint32_t db_id = *(uint32_t *)indata.dptr;
592 struct ctdb_db_context *ctdb_db;
594 bool may_recover = false;
596 ctdb_db = find_ctdb_db(ctdb, db_id);
598 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
602 if (ctdb_db->unhealthy_reason) {
606 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
608 DEBUG(DEBUG_ERR,(__location__
609 " ctdb_update_persistent_health(%s) failed\n",
614 if (may_recover && !ctdb->done_startup) {
615 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
617 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
623 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
627 uint32_t db_id = *(uint32_t *)indata.dptr;
628 struct ctdb_db_context *ctdb_db;
631 ctdb_db = find_ctdb_db(ctdb, db_id);
633 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
637 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
639 DEBUG(DEBUG_ERR,(__location__
640 " ctdb_load_persistent_health(%s) failed\n",
646 if (ctdb_db->unhealthy_reason) {
647 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
648 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
655 attach to a database, handling both persistent and non-persistent databases
656 return 0 on success, -1 on failure
658 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
659 bool persistent, const char *unhealthy_reason,
662 struct ctdb_db_context *ctdb_db, *tmp_db;
667 int remaining_tries = 0;
669 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
670 CTDB_NO_MEMORY(ctdb, ctdb_db);
672 ctdb_db->priority = 1;
673 ctdb_db->ctdb = ctdb;
674 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
675 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
677 key.dsize = strlen(db_name)+1;
678 key.dptr = discard_const(db_name);
679 ctdb_db->db_id = ctdb_hash(&key);
680 ctdb_db->persistent = persistent;
682 if (!ctdb_db->persistent) {
683 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
684 if (ctdb_db->delete_queue == NULL) {
685 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
688 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
691 /* check for hash collisions */
692 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
693 if (tmp_db->db_id == ctdb_db->db_id) {
694 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
695 tmp_db->db_id, db_name, tmp_db->db_name));
696 talloc_free(ctdb_db);
702 if (unhealthy_reason) {
703 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
704 unhealthy_reason, 0);
706 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
707 ctdb_db->db_name, unhealthy_reason, ret));
708 talloc_free(ctdb_db);
713 if (ctdb->max_persistent_check_errors > 0) {
716 if (ctdb->done_startup) {
720 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
722 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
723 ctdb_db->db_name, ret));
724 talloc_free(ctdb_db);
729 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
730 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
731 ctdb_db->db_name, ctdb_db->unhealthy_reason));
732 talloc_free(ctdb_db);
736 if (ctdb_db->unhealthy_reason) {
737 /* this is just a warning, but we want that in the log file! */
738 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
739 ctdb_db->db_name, ctdb_db->unhealthy_reason));
742 /* open the database */
743 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
744 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
747 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
748 if (ctdb->valgrinding) {
749 tdb_flags |= TDB_NOMMAP;
751 tdb_flags |= TDB_DISALLOW_NESTING;
753 tdb_flags |= TDB_INCOMPATIBLE_HASH;
757 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
758 ctdb->tunable.database_hash_size,
760 O_CREAT|O_RDWR, mode);
761 if (ctdb_db->ltdb == NULL) {
763 int saved_errno = errno;
766 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
769 strerror(saved_errno)));
770 talloc_free(ctdb_db);
774 if (remaining_tries == 0) {
775 DEBUG(DEBUG_CRIT,(__location__
776 "Failed to open persistent tdb '%s': %d - %s\n",
779 strerror(saved_errno)));
780 talloc_free(ctdb_db);
784 ret = stat(ctdb_db->db_path, &st);
786 DEBUG(DEBUG_CRIT,(__location__
787 "Failed to open persistent tdb '%s': %d - %s\n",
790 strerror(saved_errno)));
791 talloc_free(ctdb_db);
795 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
797 DEBUG(DEBUG_CRIT,(__location__
798 "Failed to open persistent tdb '%s': %d - %s\n",
801 strerror(saved_errno)));
802 talloc_free(ctdb_db);
812 ctdb_check_db_empty(ctdb_db);
814 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
819 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
820 ctdb_db->db_path, ret,
821 tdb_errorstr(ctdb_db->ltdb->tdb)));
822 if (remaining_tries == 0) {
823 talloc_free(ctdb_db);
827 fd = tdb_fd(ctdb_db->ltdb->tdb);
828 ret = fstat(fd, &st);
830 DEBUG(DEBUG_CRIT,(__location__
831 "Failed to fstat() persistent tdb '%s': %d - %s\n",
835 talloc_free(ctdb_db);
840 talloc_free(ctdb_db->ltdb);
841 ctdb_db->ltdb = NULL;
843 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
845 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
847 talloc_free(ctdb_db);
857 DLIST_ADD(ctdb->db_list, ctdb_db);
859 /* setting this can help some high churn databases */
860 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
863 all databases support the "null" function. we need this in
864 order to do forced migration of records
866 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
868 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
869 talloc_free(ctdb_db);
874 all databases support the "fetch" function. we need this
875 for efficient Samba3 ctdb fetch
877 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
879 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
880 talloc_free(ctdb_db);
884 ret = ctdb_vacuum_init(ctdb_db);
886 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
887 "database '%s'\n", ctdb_db->db_name));
888 talloc_free(ctdb_db);
893 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
900 struct ctdb_deferred_attach_context {
901 struct ctdb_deferred_attach_context *next, *prev;
902 struct ctdb_context *ctdb;
903 struct ctdb_req_control *c;
907 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
909 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
914 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
916 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
917 struct ctdb_context *ctdb = da_ctx->ctdb;
919 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
923 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
925 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
926 struct ctdb_context *ctdb = da_ctx->ctdb;
928 /* This talloc-steals the packet ->c */
929 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
933 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
935 struct ctdb_deferred_attach_context *da_ctx;
937 /* call it from the main event loop as soon as the current event
940 while ((da_ctx = ctdb->deferred_attach) != NULL) {
941 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
942 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
949 a client has asked to attach a new database
951 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
952 TDB_DATA *outdata, uint64_t tdb_flags,
953 bool persistent, uint32_t client_id,
954 struct ctdb_req_control *c,
957 const char *db_name = (const char *)indata.dptr;
958 struct ctdb_db_context *db;
959 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
960 struct ctdb_client *client = NULL;
962 /* dont allow any local clients to attach while we are in recovery mode
963 * except for the recovery daemon.
964 * allow all attach from the network since these are always from remote
967 if (client_id != 0) {
968 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
970 if (client != NULL) {
971 /* If the node is inactive it is not part of the cluster
972 and we should not allow clients to attach to any
975 if (node->flags & NODE_FLAGS_INACTIVE) {
976 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
980 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
981 && client->pid != ctdb->recoverd_pid) {
982 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
984 if (da_ctx == NULL) {
985 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
990 da_ctx->c = talloc_steal(da_ctx, c);
991 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
992 DLIST_ADD(ctdb->deferred_attach, da_ctx);
994 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
996 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1002 /* the client can optionally pass additional tdb flags, but we
1003 only allow a subset of those on the database in ctdb. Note
1004 that tdb_flags is passed in via the (otherwise unused)
1005 srvid to the attach control */
1006 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1008 /* see if we already have this name */
1009 db = ctdb_db_handle(ctdb, db_name);
1011 outdata->dptr = (uint8_t *)&db->db_id;
1012 outdata->dsize = sizeof(db->db_id);
1013 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1017 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1021 db = ctdb_db_handle(ctdb, db_name);
1023 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1027 /* remember the flags the client has specified */
1028 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1030 outdata->dptr = (uint8_t *)&db->db_id;
1031 outdata->dsize = sizeof(db->db_id);
1033 /* Try to ensure it's locked in mem */
1034 ctdb_lockdown_memory(ctdb);
1036 /* tell all the other nodes about this database */
1037 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1038 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1039 CTDB_CONTROL_DB_ATTACH,
1040 0, CTDB_CTRL_FLAG_NOREPLY,
1041 indata, NULL, NULL);
1049 attach to all existing persistent databases
1051 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1052 const char *unhealthy_reason)
1057 /* open the persistent db directory and scan it for files */
1058 d = opendir(ctdb->db_directory_persistent);
1063 while ((de=readdir(d))) {
1065 size_t len = strlen(de->d_name);
1067 int invalid_name = 0;
1069 s = talloc_strdup(ctdb, de->d_name);
1070 CTDB_NO_MEMORY(ctdb, s);
1072 /* only accept names ending in .tdb */
1073 p = strstr(s, ".tdb.");
1074 if (len < 7 || p == NULL) {
1079 /* only accept names ending with .tdb. and any number of digits */
1081 while (*q != 0 && invalid_name == 0) {
1082 if (!isdigit(*q++)) {
1086 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1087 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1093 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1094 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1100 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1108 int ctdb_attach_databases(struct ctdb_context *ctdb)
1111 char *persistent_health_path = NULL;
1112 char *unhealthy_reason = NULL;
1113 bool first_try = true;
1115 if (ctdb->db_directory == NULL) {
1116 ctdb->db_directory = VARDIR "/ctdb";
1118 if (ctdb->db_directory_persistent == NULL) {
1119 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1121 if (ctdb->db_directory_state == NULL) {
1122 ctdb->db_directory_state = VARDIR "/ctdb/state";
1125 /* make sure the db directory exists */
1126 ret = mkdir(ctdb->db_directory, 0700);
1127 if (ret == -1 && errno != EEXIST) {
1128 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1129 ctdb->db_directory));
1133 /* make sure the persistent db directory exists */
1134 ret = mkdir(ctdb->db_directory_persistent, 0700);
1135 if (ret == -1 && errno != EEXIST) {
1136 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1137 ctdb->db_directory_persistent));
1141 /* make sure the internal state db directory exists */
1142 ret = mkdir(ctdb->db_directory_state, 0700);
1143 if (ret == -1 && errno != EEXIST) {
1144 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1145 ctdb->db_directory_state));
1149 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1150 ctdb->db_directory_state,
1151 PERSISTENT_HEALTH_TDB,
1153 if (persistent_health_path == NULL) {
1154 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1160 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1161 0, TDB_DISALLOW_NESTING,
1162 O_CREAT | O_RDWR, 0600);
1163 if (ctdb->db_persistent_health == NULL) {
1164 struct tdb_wrap *tdb;
1167 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1168 persistent_health_path,
1171 talloc_free(persistent_health_path);
1172 talloc_free(unhealthy_reason);
1177 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1178 persistent_health_path,
1179 "was cleared after a failure",
1180 "manual verification needed");
1181 if (unhealthy_reason == NULL) {
1182 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1183 talloc_free(persistent_health_path);
1187 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1188 persistent_health_path));
1189 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1190 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1191 O_CREAT | O_RDWR, 0600);
1193 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1194 persistent_health_path,
1197 talloc_free(persistent_health_path);
1198 talloc_free(unhealthy_reason);
1205 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1207 struct tdb_wrap *tdb;
1209 talloc_free(ctdb->db_persistent_health);
1210 ctdb->db_persistent_health = NULL;
1213 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1214 persistent_health_path));
1215 talloc_free(persistent_health_path);
1216 talloc_free(unhealthy_reason);
1221 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1222 persistent_health_path,
1223 "was cleared after a failure",
1224 "manual verification needed");
1225 if (unhealthy_reason == NULL) {
1226 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1227 talloc_free(persistent_health_path);
1231 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1232 persistent_health_path));
1233 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1234 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1235 O_CREAT | O_RDWR, 0600);
1237 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1238 persistent_health_path,
1241 talloc_free(persistent_health_path);
1242 talloc_free(unhealthy_reason);
1249 talloc_free(persistent_health_path);
1251 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1252 talloc_free(unhealthy_reason);
1261 called when a broadcast seqnum update comes in
1263 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1265 struct ctdb_db_context *ctdb_db;
1266 if (srcnode == ctdb->pnn) {
1267 /* don't update ourselves! */
1271 ctdb_db = find_ctdb_db(ctdb, db_id);
1273 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1277 if (ctdb_db->unhealthy_reason) {
1278 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1279 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1283 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1284 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1289 timer to check for seqnum changes in a ltdb and propogate them
1291 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1292 struct timeval t, void *p)
1294 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1295 struct ctdb_context *ctdb = ctdb_db->ctdb;
1296 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1297 if (new_seqnum != ctdb_db->seqnum) {
1298 /* something has changed - propogate it */
1300 data.dptr = (uint8_t *)&ctdb_db->db_id;
1301 data.dsize = sizeof(uint32_t);
1302 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1303 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1306 ctdb_db->seqnum = new_seqnum;
1308 /* setup a new timer */
1309 ctdb_db->seqnum_update =
1310 event_add_timed(ctdb->ev, ctdb_db,
1311 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1312 ctdb_ltdb_seqnum_check, ctdb_db);
1316 enable seqnum handling on this db
1318 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1320 struct ctdb_db_context *ctdb_db;
1321 ctdb_db = find_ctdb_db(ctdb, db_id);
1323 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1327 if (ctdb_db->seqnum_update == NULL) {
1328 ctdb_db->seqnum_update =
1329 event_add_timed(ctdb->ev, ctdb_db,
1330 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1331 ctdb_ltdb_seqnum_check, ctdb_db);
1334 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1335 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1339 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1341 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1342 struct ctdb_db_context *ctdb_db;
1344 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1346 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1350 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1351 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1355 ctdb_db->priority = db_prio->priority;
1356 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));