2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 this is the dummy null procedure that all databases support
38 static int ctdb_null_func(struct ctdb_call_info *call)
44 this is a plain fetch procedure that all databases support
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
48 call->reply_data = &call->record_data;
53 this is a plain fetch procedure that all databases support
54 this returns the full record including the ltdb header
56 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
58 call->reply_data = talloc(call, TDB_DATA);
59 if (call->reply_data == NULL) {
62 call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
63 call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize);
64 if (call->reply_data->dptr == NULL) {
67 memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
68 memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
75 * write a record to a normal database
77 * This is the server-variant of the ctdb_ltdb_store function.
78 * It contains logic to determine whether a record should be
79 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
80 * controls to the local ctdb daemon if apporpriate.
82 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
84 struct ctdb_ltdb_header *header,
87 struct ctdb_context *ctdb = ctdb_db->ctdb;
90 bool seqnum_suppressed = false;
92 bool schedule_for_deletion = false;
95 if (ctdb->flags & CTDB_FLAG_TORTURE) {
96 struct ctdb_ltdb_header *h2;
97 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
98 h2 = (struct ctdb_ltdb_header *)rec.dptr;
99 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
100 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
101 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
103 if (rec.dptr) free(rec.dptr);
106 if (ctdb->vnn_map == NULL) {
108 * Called from a client: always store the record
109 * Also don't call ctdb_lmaster since it uses the vnn_map!
115 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
118 * If we migrate an empty record off to another node
119 * and the record has not been migrated with data,
120 * delete the record instead of storing the empty record.
122 if (data.dsize != 0) {
124 } else if (ctdb_db->persistent) {
126 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
128 * The record is not created by the client but
129 * automatically by the ctdb_ltdb_fetch logic that
130 * creates a record with an initial header in the
131 * ltdb before trying to migrate the record from
132 * the current lmaster. Keep it instead of trying
133 * to delete the non-existing record...
136 schedule_for_deletion = true;
137 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
139 } else if (ctdb_db->ctdb->pnn == lmaster) {
141 * If we are lmaster, then we usually keep the record.
142 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
143 * and the record is empty and has never been migrated
144 * with data, then we should delete it instead of storing it.
145 * This is part of the vacuuming process.
147 * The reason that we usually need to store even empty records
148 * on the lmaster is that a client operating directly on the
149 * lmaster (== dmaster) expects the local copy of the record to
150 * exist after successful ctdb migrate call. If the record does
151 * not exist, the client goes into a migrate loop and eventually
152 * fails. So storing the empty record makes sure that we do not
153 * need to change the client code.
155 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
157 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
160 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
166 !ctdb_db->persistent &&
167 (ctdb_db->ctdb->pnn == header->dmaster))
169 schedule_for_deletion = true;
174 * The VACUUM_MIGRATED flag is only set temporarily for
175 * the above logic when the record was retrieved by a
176 * VACUUM_MIGRATE call and should not be stored in the
179 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
180 * and there are two cases in which the corresponding record
181 * is stored in the local database:
182 * 1. The record has been migrated with data in the past
183 * (the MIGRATED_WITH_DATA record flag is set).
184 * 2. The record has been filled with data again since it
185 * had been submitted in the VACUUM_FETCH message to the
187 * For such records it is important to not store the
188 * VACUUM_MIGRATED flag in the database.
190 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
193 * Similarly, clear the AUTOMATIC flag which should not enter
194 * the local database copy since this would require client
195 * modifications to clear the flag when the client stores
198 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
200 rec.dsize = sizeof(*header) + data.dsize;
201 rec.dptr = talloc_size(ctdb, rec.dsize);
202 CTDB_NO_MEMORY(ctdb, rec.dptr);
204 memcpy(rec.dptr, header, sizeof(*header));
205 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
207 /* Databases with seqnum updates enabled only get their seqnum
208 changes when/if we modify the data */
209 if (ctdb_db->seqnum_update != NULL) {
211 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
213 if ( (old.dsize == rec.dsize)
214 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
215 rec.dptr+sizeof(struct ctdb_ltdb_header),
216 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
217 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
218 seqnum_suppressed = true;
220 if (old.dptr) free(old.dptr);
223 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
225 keep?"storing":"deleting",
229 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
231 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
238 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
243 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
246 keep?"store":"delete", ret,
247 tdb_errorstr(ctdb_db->ltdb->tdb)));
249 schedule_for_deletion = false;
251 if (seqnum_suppressed) {
252 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
255 talloc_free(rec.dptr);
257 if (schedule_for_deletion) {
259 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
261 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
268 struct lock_fetch_state {
269 struct ctdb_context *ctdb;
270 void (*recv_pkt)(void *, struct ctdb_req_header *);
272 struct ctdb_req_header *hdr;
274 bool ignore_generation;
278 called when we should retry the operation
280 static void lock_fetch_callback(void *p)
282 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
283 if (!state->ignore_generation &&
284 state->generation != state->ctdb->vnn_map->generation) {
285 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
286 talloc_free(state->hdr);
289 state->recv_pkt(state->recv_context, state->hdr);
290 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
295 do a non-blocking ltdb_lock, deferring this ctdb request until we
298 It does the following:
300 1) tries to get the chainlock. If it succeeds, then it returns 0
302 2) if it fails to get a chainlock immediately then it sets up a
303 non-blocking chainlock via ctdb_lockwait, and when it gets the
304 chainlock it re-submits this ctdb request to the main packet
307 This effectively queues all ctdb requests that cannot be
308 immediately satisfied until it can get the lock. This means that
309 the main ctdb daemon will not block waiting for a chainlock held by
312 There are 3 possible return values:
314 0: means that it got the lock immediately.
315 -1: means that it failed to get the lock, and won't retry
316 -2: means that it failed to get the lock immediately, but will retry
318 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
319 TDB_DATA key, struct ctdb_req_header *hdr,
320 void (*recv_pkt)(void *, struct ctdb_req_header *),
321 void *recv_context, bool ignore_generation)
324 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
325 struct lockwait_handle *h;
326 struct lock_fetch_state *state;
328 ret = tdb_chainlock_nonblock(tdb, key);
331 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
332 /* a hard failure - don't try again */
336 /* when torturing, ensure we test the contended path */
337 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
340 tdb_chainunlock(tdb, key);
343 /* first the non-contended path */
348 state = talloc(hdr, struct lock_fetch_state);
349 state->ctdb = ctdb_db->ctdb;
351 state->recv_pkt = recv_pkt;
352 state->recv_context = recv_context;
353 state->generation = ctdb_db->ctdb->vnn_map->generation;
354 state->ignore_generation = ignore_generation;
356 /* now the contended path */
357 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
362 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
363 so it won't be freed yet */
364 talloc_steal(state, hdr);
365 talloc_steal(state, h);
367 /* now tell the caller than we will retry asynchronously */
372 a varient of ctdb_ltdb_lock_requeue that also fetches the record
374 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
375 TDB_DATA key, struct ctdb_ltdb_header *header,
376 struct ctdb_req_header *hdr, TDB_DATA *data,
377 void (*recv_pkt)(void *, struct ctdb_req_header *),
378 void *recv_context, bool ignore_generation)
382 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
383 recv_context, ignore_generation);
385 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
388 uret = ctdb_ltdb_unlock(ctdb_db, key);
390 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
399 paraoid check to see if the db is empty
401 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
403 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
404 int count = tdb_traverse_read(tdb, NULL, NULL);
406 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
408 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
412 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
413 struct ctdb_db_context *ctdb_db)
415 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
421 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
422 key.dsize = strlen(ctdb_db->db_name);
424 old = ctdb_db->unhealthy_reason;
425 ctdb_db->unhealthy_reason = NULL;
427 val = tdb_fetch(tdb, key);
429 reason = talloc_strndup(ctdb_db,
430 (const char *)val.dptr,
432 if (reason == NULL) {
433 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
435 ctdb_db->unhealthy_reason = old;
446 ctdb_db->unhealthy_reason = reason;
450 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
451 struct ctdb_db_context *ctdb_db,
452 const char *given_reason,/* NULL means healthy */
453 int num_healthy_nodes)
455 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
459 char *new_reason = NULL;
460 char *old_reason = NULL;
462 ret = tdb_transaction_start(tdb);
464 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
465 tdb_name(tdb), ret, tdb_errorstr(tdb)));
469 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
471 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
472 ctdb_db->db_name, ret));
475 old_reason = ctdb_db->unhealthy_reason;
477 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
478 key.dsize = strlen(ctdb_db->db_name);
481 new_reason = talloc_strdup(ctdb_db, given_reason);
482 if (new_reason == NULL) {
483 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
487 } else if (old_reason && num_healthy_nodes == 0) {
489 * If the reason indicates ok, but there where no healthy nodes
490 * available, that it means, we have not recovered valid content
491 * of the db. So if there's an old reason, prefix it with
492 * "NO-HEALTHY-NODES - "
496 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
497 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
499 prefix = _TMP_PREFIX;
503 new_reason = talloc_asprintf(ctdb_db, "%s%s",
505 if (new_reason == NULL) {
506 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
507 prefix, old_reason));
514 val.dptr = discard_const_p(uint8_t, new_reason);
515 val.dsize = strlen(new_reason);
517 ret = tdb_store(tdb, key, val, TDB_REPLACE);
519 tdb_transaction_cancel(tdb);
520 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
521 tdb_name(tdb), ctdb_db->db_name, new_reason,
522 ret, tdb_errorstr(tdb)));
523 talloc_free(new_reason);
526 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
527 ctdb_db->db_name, new_reason));
528 } else if (old_reason) {
529 ret = tdb_delete(tdb, key);
531 tdb_transaction_cancel(tdb);
532 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
533 tdb_name(tdb), ctdb_db->db_name,
534 ret, tdb_errorstr(tdb)));
535 talloc_free(new_reason);
538 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
542 ret = tdb_transaction_commit(tdb);
543 if (ret != TDB_SUCCESS) {
544 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
545 tdb_name(tdb), ret, tdb_errorstr(tdb)));
546 talloc_free(new_reason);
550 talloc_free(old_reason);
551 ctdb_db->unhealthy_reason = new_reason;
556 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
557 struct ctdb_db_context *ctdb_db)
559 time_t now = time(NULL);
567 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
568 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
569 "%04u%02u%02u%02u%02u%02u.0Z",
571 tm->tm_year+1900, tm->tm_mon+1,
572 tm->tm_mday, tm->tm_hour, tm->tm_min,
574 if (new_path == NULL) {
575 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
579 new_reason = talloc_asprintf(ctdb_db,
580 "ERROR - Backup of corrupted TDB in '%s'",
582 if (new_reason == NULL) {
583 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
586 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
587 talloc_free(new_reason);
589 DEBUG(DEBUG_CRIT,(__location__
590 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
595 ret = rename(ctdb_db->db_path, new_path);
597 DEBUG(DEBUG_CRIT,(__location__
598 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
599 ctdb_db->db_path, new_path,
600 errno, strerror(errno)));
601 talloc_free(new_path);
605 DEBUG(DEBUG_CRIT,(__location__
606 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
607 ctdb_db->db_path, new_path));
608 talloc_free(new_path);
612 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
614 struct ctdb_db_context *ctdb_db;
619 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
620 if (!ctdb_db->persistent) {
624 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
626 DEBUG(DEBUG_ALERT,(__location__
627 " load persistent health for '%s' failed\n",
632 if (ctdb_db->unhealthy_reason == NULL) {
634 DEBUG(DEBUG_INFO,(__location__
635 " persistent db '%s' healthy\n",
641 DEBUG(DEBUG_ALERT,(__location__
642 " persistent db '%s' unhealthy: %s\n",
644 ctdb_db->unhealthy_reason));
646 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
647 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
659 mark a database - as healthy
661 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
663 uint32_t db_id = *(uint32_t *)indata.dptr;
664 struct ctdb_db_context *ctdb_db;
666 bool may_recover = false;
668 ctdb_db = find_ctdb_db(ctdb, db_id);
670 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
674 if (ctdb_db->unhealthy_reason) {
678 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
680 DEBUG(DEBUG_ERR,(__location__
681 " ctdb_update_persistent_health(%s) failed\n",
686 if (may_recover && !ctdb->done_startup) {
687 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
689 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
695 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
699 uint32_t db_id = *(uint32_t *)indata.dptr;
700 struct ctdb_db_context *ctdb_db;
703 ctdb_db = find_ctdb_db(ctdb, db_id);
705 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
709 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
711 DEBUG(DEBUG_ERR,(__location__
712 " ctdb_load_persistent_health(%s) failed\n",
718 if (ctdb_db->unhealthy_reason) {
719 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
720 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
727 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
731 DEBUG(DEBUG_ERR,("XXX set db readonly %s\n", ctdb_db->db_name));
733 if (ctdb_db->readonly) {
737 if (ctdb_db->persistent) {
738 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
742 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
743 if (ropath == NULL) {
744 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
747 ctdb_db->rottdb = tdb_open(ropath,
748 ctdb->tunable.database_hash_size,
749 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
751 if (ctdb_db->rottdb == NULL) {
752 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
757 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
759 ctdb_db->readonly = true;
765 attach to a database, handling both persistent and non-persistent databases
766 return 0 on success, -1 on failure
768 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
769 bool persistent, const char *unhealthy_reason,
772 struct ctdb_db_context *ctdb_db, *tmp_db;
777 int remaining_tries = 0;
779 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
780 CTDB_NO_MEMORY(ctdb, ctdb_db);
782 ctdb_db->priority = 1;
783 ctdb_db->ctdb = ctdb;
784 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
785 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
787 key.dsize = strlen(db_name)+1;
788 key.dptr = discard_const(db_name);
789 ctdb_db->db_id = ctdb_hash(&key);
790 ctdb_db->persistent = persistent;
792 if (!ctdb_db->persistent) {
793 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
794 if (ctdb_db->delete_queue == NULL) {
795 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
798 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
801 /* check for hash collisions */
802 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
803 if (tmp_db->db_id == ctdb_db->db_id) {
804 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
805 tmp_db->db_id, db_name, tmp_db->db_name));
806 talloc_free(ctdb_db);
812 if (unhealthy_reason) {
813 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
814 unhealthy_reason, 0);
816 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
817 ctdb_db->db_name, unhealthy_reason, ret));
818 talloc_free(ctdb_db);
823 if (ctdb->max_persistent_check_errors > 0) {
826 if (ctdb->done_startup) {
830 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
832 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
833 ctdb_db->db_name, ret));
834 talloc_free(ctdb_db);
839 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
840 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
841 ctdb_db->db_name, ctdb_db->unhealthy_reason));
842 talloc_free(ctdb_db);
846 if (ctdb_db->unhealthy_reason) {
847 /* this is just a warning, but we want that in the log file! */
848 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
849 ctdb_db->db_name, ctdb_db->unhealthy_reason));
852 /* open the database */
853 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
854 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
857 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
858 if (ctdb->valgrinding) {
859 tdb_flags |= TDB_NOMMAP;
861 tdb_flags |= TDB_DISALLOW_NESTING;
863 tdb_flags |= TDB_INCOMPATIBLE_HASH;
867 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
868 ctdb->tunable.database_hash_size,
870 O_CREAT|O_RDWR, mode);
871 if (ctdb_db->ltdb == NULL) {
873 int saved_errno = errno;
876 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
879 strerror(saved_errno)));
880 talloc_free(ctdb_db);
884 if (remaining_tries == 0) {
885 DEBUG(DEBUG_CRIT,(__location__
886 "Failed to open persistent tdb '%s': %d - %s\n",
889 strerror(saved_errno)));
890 talloc_free(ctdb_db);
894 ret = stat(ctdb_db->db_path, &st);
896 DEBUG(DEBUG_CRIT,(__location__
897 "Failed to open persistent tdb '%s': %d - %s\n",
900 strerror(saved_errno)));
901 talloc_free(ctdb_db);
905 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
907 DEBUG(DEBUG_CRIT,(__location__
908 "Failed to open persistent tdb '%s': %d - %s\n",
911 strerror(saved_errno)));
912 talloc_free(ctdb_db);
922 ctdb_check_db_empty(ctdb_db);
924 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
929 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
930 ctdb_db->db_path, ret,
931 tdb_errorstr(ctdb_db->ltdb->tdb)));
932 if (remaining_tries == 0) {
933 talloc_free(ctdb_db);
937 fd = tdb_fd(ctdb_db->ltdb->tdb);
938 ret = fstat(fd, &st);
940 DEBUG(DEBUG_CRIT,(__location__
941 "Failed to fstat() persistent tdb '%s': %d - %s\n",
945 talloc_free(ctdb_db);
950 talloc_free(ctdb_db->ltdb);
951 ctdb_db->ltdb = NULL;
953 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
955 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
957 talloc_free(ctdb_db);
967 DLIST_ADD(ctdb->db_list, ctdb_db);
969 /* setting this can help some high churn databases */
970 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
973 all databases support the "null" function. we need this in
974 order to do forced migration of records
976 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
978 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
979 talloc_free(ctdb_db);
984 all databases support the "fetch" function. we need this
985 for efficient Samba3 ctdb fetch
987 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
989 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
990 talloc_free(ctdb_db);
995 all databases support the "fetch_with_header" function. we need this
996 for efficient readonly record fetches
998 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1000 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1001 talloc_free(ctdb_db);
1005 ret = ctdb_vacuum_init(ctdb_db);
1007 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1008 "database '%s'\n", ctdb_db->db_name));
1009 talloc_free(ctdb_db);
1014 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1021 struct ctdb_deferred_attach_context {
1022 struct ctdb_deferred_attach_context *next, *prev;
1023 struct ctdb_context *ctdb;
1024 struct ctdb_req_control *c;
1028 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1030 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1035 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1037 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1038 struct ctdb_context *ctdb = da_ctx->ctdb;
1040 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1041 talloc_free(da_ctx);
1044 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1046 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1047 struct ctdb_context *ctdb = da_ctx->ctdb;
1049 /* This talloc-steals the packet ->c */
1050 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1051 talloc_free(da_ctx);
1054 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1056 struct ctdb_deferred_attach_context *da_ctx;
1058 /* call it from the main event loop as soon as the current event
1061 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1062 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1063 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1070 a client has asked to attach a new database
1072 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1073 TDB_DATA *outdata, uint64_t tdb_flags,
1074 bool persistent, uint32_t client_id,
1075 struct ctdb_req_control *c,
1078 const char *db_name = (const char *)indata.dptr;
1079 struct ctdb_db_context *db;
1080 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1081 struct ctdb_client *client = NULL;
1083 if (ctdb->tunable.allow_client_db_attach == 0) {
1084 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1085 "AllowClientDBAccess == 0\n", db_name));
1089 /* dont allow any local clients to attach while we are in recovery mode
1090 * except for the recovery daemon.
1091 * allow all attach from the network since these are always from remote
1094 if (client_id != 0) {
1095 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1097 if (client != NULL) {
1098 /* If the node is inactive it is not part of the cluster
1099 and we should not allow clients to attach to any
1102 if (node->flags & NODE_FLAGS_INACTIVE) {
1103 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1107 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1108 && client->pid != ctdb->recoverd_pid
1109 && !ctdb->done_startup) {
1110 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1112 if (da_ctx == NULL) {
1113 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1117 da_ctx->ctdb = ctdb;
1118 da_ctx->c = talloc_steal(da_ctx, c);
1119 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1120 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1122 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1124 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1125 *async_reply = true;
1130 /* the client can optionally pass additional tdb flags, but we
1131 only allow a subset of those on the database in ctdb. Note
1132 that tdb_flags is passed in via the (otherwise unused)
1133 srvid to the attach control */
1134 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1136 /* see if we already have this name */
1137 db = ctdb_db_handle(ctdb, db_name);
1139 outdata->dptr = (uint8_t *)&db->db_id;
1140 outdata->dsize = sizeof(db->db_id);
1141 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1145 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1149 db = ctdb_db_handle(ctdb, db_name);
1151 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1155 /* remember the flags the client has specified */
1156 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1158 outdata->dptr = (uint8_t *)&db->db_id;
1159 outdata->dsize = sizeof(db->db_id);
1161 /* Try to ensure it's locked in mem */
1162 ctdb_lockdown_memory(ctdb);
1164 /* tell all the other nodes about this database */
1165 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1166 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1167 CTDB_CONTROL_DB_ATTACH,
1168 0, CTDB_CTRL_FLAG_NOREPLY,
1169 indata, NULL, NULL);
1177 attach to all existing persistent databases
1179 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1180 const char *unhealthy_reason)
1185 /* open the persistent db directory and scan it for files */
1186 d = opendir(ctdb->db_directory_persistent);
1191 while ((de=readdir(d))) {
1193 size_t len = strlen(de->d_name);
1195 int invalid_name = 0;
1197 s = talloc_strdup(ctdb, de->d_name);
1198 CTDB_NO_MEMORY(ctdb, s);
1200 /* only accept names ending in .tdb */
1201 p = strstr(s, ".tdb.");
1202 if (len < 7 || p == NULL) {
1207 /* only accept names ending with .tdb. and any number of digits */
1209 while (*q != 0 && invalid_name == 0) {
1210 if (!isdigit(*q++)) {
1214 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1215 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1221 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1222 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1228 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1236 int ctdb_attach_databases(struct ctdb_context *ctdb)
1239 char *persistent_health_path = NULL;
1240 char *unhealthy_reason = NULL;
1241 bool first_try = true;
1243 if (ctdb->db_directory == NULL) {
1244 ctdb->db_directory = VARDIR "/ctdb";
1246 if (ctdb->db_directory_persistent == NULL) {
1247 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1249 if (ctdb->db_directory_state == NULL) {
1250 ctdb->db_directory_state = VARDIR "/ctdb/state";
1253 /* make sure the db directory exists */
1254 ret = mkdir(ctdb->db_directory, 0700);
1255 if (ret == -1 && errno != EEXIST) {
1256 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1257 ctdb->db_directory));
1261 /* make sure the persistent db directory exists */
1262 ret = mkdir(ctdb->db_directory_persistent, 0700);
1263 if (ret == -1 && errno != EEXIST) {
1264 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1265 ctdb->db_directory_persistent));
1269 /* make sure the internal state db directory exists */
1270 ret = mkdir(ctdb->db_directory_state, 0700);
1271 if (ret == -1 && errno != EEXIST) {
1272 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1273 ctdb->db_directory_state));
1277 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1278 ctdb->db_directory_state,
1279 PERSISTENT_HEALTH_TDB,
1281 if (persistent_health_path == NULL) {
1282 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1288 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1289 0, TDB_DISALLOW_NESTING,
1290 O_CREAT | O_RDWR, 0600);
1291 if (ctdb->db_persistent_health == NULL) {
1292 struct tdb_wrap *tdb;
1295 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1296 persistent_health_path,
1299 talloc_free(persistent_health_path);
1300 talloc_free(unhealthy_reason);
1305 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1306 persistent_health_path,
1307 "was cleared after a failure",
1308 "manual verification needed");
1309 if (unhealthy_reason == NULL) {
1310 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1311 talloc_free(persistent_health_path);
1315 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1316 persistent_health_path));
1317 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1318 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1319 O_CREAT | O_RDWR, 0600);
1321 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1322 persistent_health_path,
1325 talloc_free(persistent_health_path);
1326 talloc_free(unhealthy_reason);
1333 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1335 struct tdb_wrap *tdb;
1337 talloc_free(ctdb->db_persistent_health);
1338 ctdb->db_persistent_health = NULL;
1341 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1342 persistent_health_path));
1343 talloc_free(persistent_health_path);
1344 talloc_free(unhealthy_reason);
1349 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1350 persistent_health_path,
1351 "was cleared after a failure",
1352 "manual verification needed");
1353 if (unhealthy_reason == NULL) {
1354 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1355 talloc_free(persistent_health_path);
1359 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1360 persistent_health_path));
1361 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1362 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1363 O_CREAT | O_RDWR, 0600);
1365 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1366 persistent_health_path,
1369 talloc_free(persistent_health_path);
1370 talloc_free(unhealthy_reason);
1377 talloc_free(persistent_health_path);
1379 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1380 talloc_free(unhealthy_reason);
1389 called when a broadcast seqnum update comes in
1391 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1393 struct ctdb_db_context *ctdb_db;
1394 if (srcnode == ctdb->pnn) {
1395 /* don't update ourselves! */
1399 ctdb_db = find_ctdb_db(ctdb, db_id);
1401 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1405 if (ctdb_db->unhealthy_reason) {
1406 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1407 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1411 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1412 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1417 timer to check for seqnum changes in a ltdb and propogate them
1419 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1420 struct timeval t, void *p)
1422 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1423 struct ctdb_context *ctdb = ctdb_db->ctdb;
1424 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1425 if (new_seqnum != ctdb_db->seqnum) {
1426 /* something has changed - propogate it */
1428 data.dptr = (uint8_t *)&ctdb_db->db_id;
1429 data.dsize = sizeof(uint32_t);
1430 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1431 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1434 ctdb_db->seqnum = new_seqnum;
1436 /* setup a new timer */
1437 ctdb_db->seqnum_update =
1438 event_add_timed(ctdb->ev, ctdb_db,
1439 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1440 ctdb_ltdb_seqnum_check, ctdb_db);
1444 enable seqnum handling on this db
1446 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1448 struct ctdb_db_context *ctdb_db;
1449 ctdb_db = find_ctdb_db(ctdb, db_id);
1451 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1455 if (ctdb_db->seqnum_update == NULL) {
1456 ctdb_db->seqnum_update =
1457 event_add_timed(ctdb->ev, ctdb_db,
1458 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1459 ctdb_ltdb_seqnum_check, ctdb_db);
1462 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1463 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1467 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1469 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1470 struct ctdb_db_context *ctdb_db;
1472 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1474 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1478 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1479 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1483 ctdb_db->priority = db_prio->priority;
1484 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));