2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 * write a record to a normal database
38 * This is the server-variant of the ctdb_ltdb_store function.
39 * It contains logic to determine whether a record should be
40 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41 * controls to the local ctdb daemon if apporpriate.
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
45 struct ctdb_ltdb_header *header,
48 struct ctdb_context *ctdb = ctdb_db->ctdb;
51 bool seqnum_suppressed = false;
53 bool schedule_for_deletion = false;
54 bool remove_from_delete_queue = false;
57 if (ctdb->flags & CTDB_FLAG_TORTURE) {
58 struct ctdb_ltdb_header *h2;
59 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
60 h2 = (struct ctdb_ltdb_header *)rec.dptr;
61 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
62 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
63 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
65 if (rec.dptr) free(rec.dptr);
68 if (ctdb->vnn_map == NULL) {
70 * Called from a client: always store the record
71 * Also don't call ctdb_lmaster since it uses the vnn_map!
77 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
80 * If we migrate an empty record off to another node
81 * and the record has not been migrated with data,
82 * delete the record instead of storing the empty record.
84 if (data.dsize != 0) {
86 } else if (ctdb_db->persistent) {
88 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
90 * The record is not created by the client but
91 * automatically by the ctdb_ltdb_fetch logic that
92 * creates a record with an initial header in the
93 * ltdb before trying to migrate the record from
94 * the current lmaster. Keep it instead of trying
95 * to delete the non-existing record...
98 schedule_for_deletion = true;
99 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
101 } else if (ctdb_db->ctdb->pnn == lmaster) {
103 * If we are lmaster, then we usually keep the record.
104 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
105 * and the record is empty and has never been migrated
106 * with data, then we should delete it instead of storing it.
107 * This is part of the vacuuming process.
109 * The reason that we usually need to store even empty records
110 * on the lmaster is that a client operating directly on the
111 * lmaster (== dmaster) expects the local copy of the record to
112 * exist after successful ctdb migrate call. If the record does
113 * not exist, the client goes into a migrate loop and eventually
114 * fails. So storing the empty record makes sure that we do not
115 * need to change the client code.
117 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
119 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
122 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
127 if ((data.dsize == 0) &&
128 !ctdb_db->persistent &&
129 (ctdb_db->ctdb->pnn == header->dmaster))
131 schedule_for_deletion = true;
133 remove_from_delete_queue = !schedule_for_deletion;
138 * The VACUUM_MIGRATED flag is only set temporarily for
139 * the above logic when the record was retrieved by a
140 * VACUUM_MIGRATE call and should not be stored in the
143 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
144 * and there are two cases in which the corresponding record
145 * is stored in the local database:
146 * 1. The record has been migrated with data in the past
147 * (the MIGRATED_WITH_DATA record flag is set).
148 * 2. The record has been filled with data again since it
149 * had been submitted in the VACUUM_FETCH message to the
151 * For such records it is important to not store the
152 * VACUUM_MIGRATED flag in the database.
154 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
157 * Similarly, clear the AUTOMATIC flag which should not enter
158 * the local database copy since this would require client
159 * modifications to clear the flag when the client stores
162 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
164 rec.dsize = sizeof(*header) + data.dsize;
165 rec.dptr = talloc_size(ctdb, rec.dsize);
166 CTDB_NO_MEMORY(ctdb, rec.dptr);
168 memcpy(rec.dptr, header, sizeof(*header));
169 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
171 /* Databases with seqnum updates enabled only get their seqnum
172 changes when/if we modify the data */
173 if (ctdb_db->seqnum_update != NULL) {
175 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
177 if ( (old.dsize == rec.dsize)
178 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
179 rec.dptr+sizeof(struct ctdb_ltdb_header),
180 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
181 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
182 seqnum_suppressed = true;
184 if (old.dptr) free(old.dptr);
187 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
189 keep?"storing":"deleting",
193 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
195 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
202 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
207 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
210 keep?"store":"delete", ret,
211 tdb_errorstr(ctdb_db->ltdb->tdb)));
213 schedule_for_deletion = false;
214 remove_from_delete_queue = false;
216 if (seqnum_suppressed) {
217 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
220 talloc_free(rec.dptr);
222 if (schedule_for_deletion) {
224 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
226 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
230 if (remove_from_delete_queue) {
231 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
237 struct lock_fetch_state {
238 struct ctdb_context *ctdb;
239 void (*recv_pkt)(void *, struct ctdb_req_header *);
241 struct ctdb_req_header *hdr;
243 bool ignore_generation;
247 called when we should retry the operation
249 static void lock_fetch_callback(void *p)
251 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
252 if (!state->ignore_generation &&
253 state->generation != state->ctdb->vnn_map->generation) {
254 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
255 talloc_free(state->hdr);
258 state->recv_pkt(state->recv_context, state->hdr);
259 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
264 do a non-blocking ltdb_lock, deferring this ctdb request until we
267 It does the following:
269 1) tries to get the chainlock. If it succeeds, then it returns 0
271 2) if it fails to get a chainlock immediately then it sets up a
272 non-blocking chainlock via ctdb_lockwait, and when it gets the
273 chainlock it re-submits this ctdb request to the main packet
276 This effectively queues all ctdb requests that cannot be
277 immediately satisfied until it can get the lock. This means that
278 the main ctdb daemon will not block waiting for a chainlock held by
281 There are 3 possible return values:
283 0: means that it got the lock immediately.
284 -1: means that it failed to get the lock, and won't retry
285 -2: means that it failed to get the lock immediately, but will retry
287 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
288 TDB_DATA key, struct ctdb_req_header *hdr,
289 void (*recv_pkt)(void *, struct ctdb_req_header *),
290 void *recv_context, bool ignore_generation)
293 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
294 struct lockwait_handle *h;
295 struct lock_fetch_state *state;
297 ret = tdb_chainlock_nonblock(tdb, key);
300 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
301 /* a hard failure - don't try again */
305 /* when torturing, ensure we test the contended path */
306 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
309 tdb_chainunlock(tdb, key);
312 /* first the non-contended path */
317 state = talloc(hdr, struct lock_fetch_state);
318 state->ctdb = ctdb_db->ctdb;
320 state->recv_pkt = recv_pkt;
321 state->recv_context = recv_context;
322 state->generation = ctdb_db->ctdb->vnn_map->generation;
323 state->ignore_generation = ignore_generation;
325 /* now the contended path */
326 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
331 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
332 so it won't be freed yet */
333 talloc_steal(state, hdr);
334 talloc_steal(state, h);
336 /* now tell the caller than we will retry asynchronously */
341 a varient of ctdb_ltdb_lock_requeue that also fetches the record
343 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
344 TDB_DATA key, struct ctdb_ltdb_header *header,
345 struct ctdb_req_header *hdr, TDB_DATA *data,
346 void (*recv_pkt)(void *, struct ctdb_req_header *),
347 void *recv_context, bool ignore_generation)
351 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
352 recv_context, ignore_generation);
354 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
357 uret = ctdb_ltdb_unlock(ctdb_db, key);
359 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
368 paraoid check to see if the db is empty
370 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
372 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
373 int count = tdb_traverse_read(tdb, NULL, NULL);
375 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
377 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
381 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
382 struct ctdb_db_context *ctdb_db)
384 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
390 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
391 key.dsize = strlen(ctdb_db->db_name);
393 old = ctdb_db->unhealthy_reason;
394 ctdb_db->unhealthy_reason = NULL;
396 val = tdb_fetch(tdb, key);
398 reason = talloc_strndup(ctdb_db,
399 (const char *)val.dptr,
401 if (reason == NULL) {
402 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
404 ctdb_db->unhealthy_reason = old;
415 ctdb_db->unhealthy_reason = reason;
419 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
420 struct ctdb_db_context *ctdb_db,
421 const char *given_reason,/* NULL means healthy */
422 int num_healthy_nodes)
424 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
428 char *new_reason = NULL;
429 char *old_reason = NULL;
431 ret = tdb_transaction_start(tdb);
433 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
434 tdb_name(tdb), ret, tdb_errorstr(tdb)));
438 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
440 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
441 ctdb_db->db_name, ret));
444 old_reason = ctdb_db->unhealthy_reason;
446 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
447 key.dsize = strlen(ctdb_db->db_name);
450 new_reason = talloc_strdup(ctdb_db, given_reason);
451 if (new_reason == NULL) {
452 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
456 } else if (old_reason && num_healthy_nodes == 0) {
458 * If the reason indicates ok, but there where no healthy nodes
459 * available, that it means, we have not recovered valid content
460 * of the db. So if there's an old reason, prefix it with
461 * "NO-HEALTHY-NODES - "
465 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
466 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
468 prefix = _TMP_PREFIX;
472 new_reason = talloc_asprintf(ctdb_db, "%s%s",
474 if (new_reason == NULL) {
475 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
476 prefix, old_reason));
483 val.dptr = discard_const_p(uint8_t, new_reason);
484 val.dsize = strlen(new_reason);
486 ret = tdb_store(tdb, key, val, TDB_REPLACE);
488 tdb_transaction_cancel(tdb);
489 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
490 tdb_name(tdb), ctdb_db->db_name, new_reason,
491 ret, tdb_errorstr(tdb)));
492 talloc_free(new_reason);
495 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
496 ctdb_db->db_name, new_reason));
497 } else if (old_reason) {
498 ret = tdb_delete(tdb, key);
500 tdb_transaction_cancel(tdb);
501 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
502 tdb_name(tdb), ctdb_db->db_name,
503 ret, tdb_errorstr(tdb)));
504 talloc_free(new_reason);
507 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
511 ret = tdb_transaction_commit(tdb);
512 if (ret != TDB_SUCCESS) {
513 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
514 tdb_name(tdb), ret, tdb_errorstr(tdb)));
515 talloc_free(new_reason);
519 talloc_free(old_reason);
520 ctdb_db->unhealthy_reason = new_reason;
525 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
526 struct ctdb_db_context *ctdb_db)
528 time_t now = time(NULL);
536 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
537 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
538 "%04u%02u%02u%02u%02u%02u.0Z",
540 tm->tm_year+1900, tm->tm_mon+1,
541 tm->tm_mday, tm->tm_hour, tm->tm_min,
543 if (new_path == NULL) {
544 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
548 new_reason = talloc_asprintf(ctdb_db,
549 "ERROR - Backup of corrupted TDB in '%s'",
551 if (new_reason == NULL) {
552 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
555 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
556 talloc_free(new_reason);
558 DEBUG(DEBUG_CRIT,(__location__
559 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
564 ret = rename(ctdb_db->db_path, new_path);
566 DEBUG(DEBUG_CRIT,(__location__
567 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
568 ctdb_db->db_path, new_path,
569 errno, strerror(errno)));
570 talloc_free(new_path);
574 DEBUG(DEBUG_CRIT,(__location__
575 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
576 ctdb_db->db_path, new_path));
577 talloc_free(new_path);
581 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
583 struct ctdb_db_context *ctdb_db;
588 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
589 if (!ctdb_db->persistent) {
593 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
595 DEBUG(DEBUG_ALERT,(__location__
596 " load persistent health for '%s' failed\n",
601 if (ctdb_db->unhealthy_reason == NULL) {
603 DEBUG(DEBUG_INFO,(__location__
604 " persistent db '%s' healthy\n",
610 DEBUG(DEBUG_ALERT,(__location__
611 " persistent db '%s' unhealthy: %s\n",
613 ctdb_db->unhealthy_reason));
615 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
616 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
628 mark a database - as healthy
630 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
632 uint32_t db_id = *(uint32_t *)indata.dptr;
633 struct ctdb_db_context *ctdb_db;
635 bool may_recover = false;
637 ctdb_db = find_ctdb_db(ctdb, db_id);
639 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
643 if (ctdb_db->unhealthy_reason) {
647 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
649 DEBUG(DEBUG_ERR,(__location__
650 " ctdb_update_persistent_health(%s) failed\n",
655 if (may_recover && !ctdb->done_startup) {
656 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
658 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
664 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
668 uint32_t db_id = *(uint32_t *)indata.dptr;
669 struct ctdb_db_context *ctdb_db;
672 ctdb_db = find_ctdb_db(ctdb, db_id);
674 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
678 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
680 DEBUG(DEBUG_ERR,(__location__
681 " ctdb_load_persistent_health(%s) failed\n",
687 if (ctdb_db->unhealthy_reason) {
688 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
689 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
696 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
700 if (ctdb_db->readonly) {
704 if (ctdb_db->persistent) {
705 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
709 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
710 if (ropath == NULL) {
711 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
714 ctdb_db->rottdb = tdb_open(ropath,
715 ctdb->tunable.database_hash_size,
716 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
718 if (ctdb_db->rottdb == NULL) {
719 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
724 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
726 ctdb_db->readonly = true;
732 attach to a database, handling both persistent and non-persistent databases
733 return 0 on success, -1 on failure
735 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
736 bool persistent, const char *unhealthy_reason,
739 struct ctdb_db_context *ctdb_db, *tmp_db;
744 int remaining_tries = 0;
746 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
747 CTDB_NO_MEMORY(ctdb, ctdb_db);
749 ctdb_db->priority = 1;
750 ctdb_db->ctdb = ctdb;
751 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
752 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
754 key.dsize = strlen(db_name)+1;
755 key.dptr = discard_const(db_name);
756 ctdb_db->db_id = ctdb_hash(&key);
757 ctdb_db->persistent = persistent;
759 if (!ctdb_db->persistent) {
760 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
761 if (ctdb_db->delete_queue == NULL) {
762 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
765 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
768 /* check for hash collisions */
769 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
770 if (tmp_db->db_id == ctdb_db->db_id) {
771 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
772 tmp_db->db_id, db_name, tmp_db->db_name));
773 talloc_free(ctdb_db);
779 if (unhealthy_reason) {
780 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
781 unhealthy_reason, 0);
783 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
784 ctdb_db->db_name, unhealthy_reason, ret));
785 talloc_free(ctdb_db);
790 if (ctdb->max_persistent_check_errors > 0) {
793 if (ctdb->done_startup) {
797 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
799 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
800 ctdb_db->db_name, ret));
801 talloc_free(ctdb_db);
806 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
807 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
808 ctdb_db->db_name, ctdb_db->unhealthy_reason));
809 talloc_free(ctdb_db);
813 if (ctdb_db->unhealthy_reason) {
814 /* this is just a warning, but we want that in the log file! */
815 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
816 ctdb_db->db_name, ctdb_db->unhealthy_reason));
819 /* open the database */
820 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
821 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
824 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
825 if (ctdb->valgrinding) {
826 tdb_flags |= TDB_NOMMAP;
828 tdb_flags |= TDB_DISALLOW_NESTING;
830 tdb_flags |= TDB_INCOMPATIBLE_HASH;
834 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
835 ctdb->tunable.database_hash_size,
837 O_CREAT|O_RDWR, mode);
838 if (ctdb_db->ltdb == NULL) {
840 int saved_errno = errno;
843 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
846 strerror(saved_errno)));
847 talloc_free(ctdb_db);
851 if (remaining_tries == 0) {
852 DEBUG(DEBUG_CRIT,(__location__
853 "Failed to open persistent tdb '%s': %d - %s\n",
856 strerror(saved_errno)));
857 talloc_free(ctdb_db);
861 ret = stat(ctdb_db->db_path, &st);
863 DEBUG(DEBUG_CRIT,(__location__
864 "Failed to open persistent tdb '%s': %d - %s\n",
867 strerror(saved_errno)));
868 talloc_free(ctdb_db);
872 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
874 DEBUG(DEBUG_CRIT,(__location__
875 "Failed to open persistent tdb '%s': %d - %s\n",
878 strerror(saved_errno)));
879 talloc_free(ctdb_db);
889 ctdb_check_db_empty(ctdb_db);
891 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
896 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
897 ctdb_db->db_path, ret,
898 tdb_errorstr(ctdb_db->ltdb->tdb)));
899 if (remaining_tries == 0) {
900 talloc_free(ctdb_db);
904 fd = tdb_fd(ctdb_db->ltdb->tdb);
905 ret = fstat(fd, &st);
907 DEBUG(DEBUG_CRIT,(__location__
908 "Failed to fstat() persistent tdb '%s': %d - %s\n",
912 talloc_free(ctdb_db);
917 talloc_free(ctdb_db->ltdb);
918 ctdb_db->ltdb = NULL;
920 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
922 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
924 talloc_free(ctdb_db);
934 /* set up a rb tree we can use to track which records we have a
935 fetch-lock in-flight for so we can defer any additional calls
938 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
939 if (ctdb_db->deferred_fetch == NULL) {
940 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
941 talloc_free(ctdb_db);
945 DLIST_ADD(ctdb->db_list, ctdb_db);
947 /* setting this can help some high churn databases */
948 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
951 all databases support the "null" function. we need this in
952 order to do forced migration of records
954 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
956 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
957 talloc_free(ctdb_db);
962 all databases support the "fetch" function. we need this
963 for efficient Samba3 ctdb fetch
965 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
967 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
968 talloc_free(ctdb_db);
973 all databases support the "fetch_with_header" function. we need this
974 for efficient readonly record fetches
976 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
978 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
979 talloc_free(ctdb_db);
983 ret = ctdb_vacuum_init(ctdb_db);
985 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
986 "database '%s'\n", ctdb_db->db_name));
987 talloc_free(ctdb_db);
992 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
999 struct ctdb_deferred_attach_context {
1000 struct ctdb_deferred_attach_context *next, *prev;
1001 struct ctdb_context *ctdb;
1002 struct ctdb_req_control *c;
1006 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1008 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1013 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1015 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1016 struct ctdb_context *ctdb = da_ctx->ctdb;
1018 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1019 talloc_free(da_ctx);
1022 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1024 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1025 struct ctdb_context *ctdb = da_ctx->ctdb;
1027 /* This talloc-steals the packet ->c */
1028 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1029 talloc_free(da_ctx);
1032 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1034 struct ctdb_deferred_attach_context *da_ctx;
1036 /* call it from the main event loop as soon as the current event
1039 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1040 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1041 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1048 a client has asked to attach a new database
1050 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1051 TDB_DATA *outdata, uint64_t tdb_flags,
1052 bool persistent, uint32_t client_id,
1053 struct ctdb_req_control *c,
1056 const char *db_name = (const char *)indata.dptr;
1057 struct ctdb_db_context *db;
1058 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1059 struct ctdb_client *client = NULL;
1061 if (ctdb->tunable.allow_client_db_attach == 0) {
1062 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1063 "AllowClientDBAccess == 0\n", db_name));
1067 /* dont allow any local clients to attach while we are in recovery mode
1068 * except for the recovery daemon.
1069 * allow all attach from the network since these are always from remote
1072 if (client_id != 0) {
1073 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1075 if (client != NULL) {
1076 /* If the node is inactive it is not part of the cluster
1077 and we should not allow clients to attach to any
1080 if (node->flags & NODE_FLAGS_INACTIVE) {
1081 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1085 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1086 && client->pid != ctdb->recoverd_pid
1087 && !ctdb->done_startup) {
1088 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1090 if (da_ctx == NULL) {
1091 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1095 da_ctx->ctdb = ctdb;
1096 da_ctx->c = talloc_steal(da_ctx, c);
1097 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1098 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1100 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1102 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1103 *async_reply = true;
1108 /* the client can optionally pass additional tdb flags, but we
1109 only allow a subset of those on the database in ctdb. Note
1110 that tdb_flags is passed in via the (otherwise unused)
1111 srvid to the attach control */
1112 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1114 /* see if we already have this name */
1115 db = ctdb_db_handle(ctdb, db_name);
1117 outdata->dptr = (uint8_t *)&db->db_id;
1118 outdata->dsize = sizeof(db->db_id);
1119 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1123 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1127 db = ctdb_db_handle(ctdb, db_name);
1129 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1133 /* remember the flags the client has specified */
1134 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1136 outdata->dptr = (uint8_t *)&db->db_id;
1137 outdata->dsize = sizeof(db->db_id);
1139 /* Try to ensure it's locked in mem */
1140 ctdb_lockdown_memory(ctdb);
1142 /* tell all the other nodes about this database */
1143 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1144 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1145 CTDB_CONTROL_DB_ATTACH,
1146 0, CTDB_CTRL_FLAG_NOREPLY,
1147 indata, NULL, NULL);
1155 attach to all existing persistent databases
1157 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1158 const char *unhealthy_reason)
1163 /* open the persistent db directory and scan it for files */
1164 d = opendir(ctdb->db_directory_persistent);
1169 while ((de=readdir(d))) {
1171 size_t len = strlen(de->d_name);
1173 int invalid_name = 0;
1175 s = talloc_strdup(ctdb, de->d_name);
1176 CTDB_NO_MEMORY(ctdb, s);
1178 /* only accept names ending in .tdb */
1179 p = strstr(s, ".tdb.");
1180 if (len < 7 || p == NULL) {
1185 /* only accept names ending with .tdb. and any number of digits */
1187 while (*q != 0 && invalid_name == 0) {
1188 if (!isdigit(*q++)) {
1192 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1193 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1199 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1200 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1206 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1214 int ctdb_attach_databases(struct ctdb_context *ctdb)
1217 char *persistent_health_path = NULL;
1218 char *unhealthy_reason = NULL;
1219 bool first_try = true;
1221 if (ctdb->db_directory == NULL) {
1222 ctdb->db_directory = VARDIR "/ctdb";
1224 if (ctdb->db_directory_persistent == NULL) {
1225 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1227 if (ctdb->db_directory_state == NULL) {
1228 ctdb->db_directory_state = VARDIR "/ctdb/state";
1231 /* make sure the db directory exists */
1232 ret = mkdir(ctdb->db_directory, 0700);
1233 if (ret == -1 && errno != EEXIST) {
1234 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1235 ctdb->db_directory));
1239 /* make sure the persistent db directory exists */
1240 ret = mkdir(ctdb->db_directory_persistent, 0700);
1241 if (ret == -1 && errno != EEXIST) {
1242 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1243 ctdb->db_directory_persistent));
1247 /* make sure the internal state db directory exists */
1248 ret = mkdir(ctdb->db_directory_state, 0700);
1249 if (ret == -1 && errno != EEXIST) {
1250 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1251 ctdb->db_directory_state));
1255 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1256 ctdb->db_directory_state,
1257 PERSISTENT_HEALTH_TDB,
1259 if (persistent_health_path == NULL) {
1260 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1266 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1267 0, TDB_DISALLOW_NESTING,
1268 O_CREAT | O_RDWR, 0600);
1269 if (ctdb->db_persistent_health == NULL) {
1270 struct tdb_wrap *tdb;
1273 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1274 persistent_health_path,
1277 talloc_free(persistent_health_path);
1278 talloc_free(unhealthy_reason);
1283 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1284 persistent_health_path,
1285 "was cleared after a failure",
1286 "manual verification needed");
1287 if (unhealthy_reason == NULL) {
1288 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1289 talloc_free(persistent_health_path);
1293 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1294 persistent_health_path));
1295 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1296 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1297 O_CREAT | O_RDWR, 0600);
1299 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1300 persistent_health_path,
1303 talloc_free(persistent_health_path);
1304 talloc_free(unhealthy_reason);
1311 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1313 struct tdb_wrap *tdb;
1315 talloc_free(ctdb->db_persistent_health);
1316 ctdb->db_persistent_health = NULL;
1319 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1320 persistent_health_path));
1321 talloc_free(persistent_health_path);
1322 talloc_free(unhealthy_reason);
1327 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1328 persistent_health_path,
1329 "was cleared after a failure",
1330 "manual verification needed");
1331 if (unhealthy_reason == NULL) {
1332 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1333 talloc_free(persistent_health_path);
1337 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1338 persistent_health_path));
1339 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1340 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1341 O_CREAT | O_RDWR, 0600);
1343 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1344 persistent_health_path,
1347 talloc_free(persistent_health_path);
1348 talloc_free(unhealthy_reason);
1355 talloc_free(persistent_health_path);
1357 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1358 talloc_free(unhealthy_reason);
1367 called when a broadcast seqnum update comes in
1369 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1371 struct ctdb_db_context *ctdb_db;
1372 if (srcnode == ctdb->pnn) {
1373 /* don't update ourselves! */
1377 ctdb_db = find_ctdb_db(ctdb, db_id);
1379 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1383 if (ctdb_db->unhealthy_reason) {
1384 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1385 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1389 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1390 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1395 timer to check for seqnum changes in a ltdb and propogate them
1397 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1398 struct timeval t, void *p)
1400 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1401 struct ctdb_context *ctdb = ctdb_db->ctdb;
1402 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1403 if (new_seqnum != ctdb_db->seqnum) {
1404 /* something has changed - propogate it */
1406 data.dptr = (uint8_t *)&ctdb_db->db_id;
1407 data.dsize = sizeof(uint32_t);
1408 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1409 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1412 ctdb_db->seqnum = new_seqnum;
1414 /* setup a new timer */
1415 ctdb_db->seqnum_update =
1416 event_add_timed(ctdb->ev, ctdb_db,
1417 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1418 ctdb_ltdb_seqnum_check, ctdb_db);
1422 enable seqnum handling on this db
1424 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1426 struct ctdb_db_context *ctdb_db;
1427 ctdb_db = find_ctdb_db(ctdb, db_id);
1429 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1433 if (ctdb_db->seqnum_update == NULL) {
1434 ctdb_db->seqnum_update =
1435 event_add_timed(ctdb->ev, ctdb_db,
1436 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1437 ctdb_ltdb_seqnum_check, ctdb_db);
1440 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1441 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1445 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1447 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1448 struct ctdb_db_context *ctdb_db;
1450 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1452 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1456 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1457 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1461 ctdb_db->priority = db_prio->priority;
1462 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));