2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 this is the dummy null procedure that all databases support
37 static int ctdb_null_func(struct ctdb_call_info *call)
43 this is a plain fetch procedure that all databases support
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 call->reply_data = &call->record_data;
53 struct lock_fetch_state {
54 struct ctdb_context *ctdb;
55 void (*recv_pkt)(void *, struct ctdb_req_header *);
57 struct ctdb_req_header *hdr;
59 bool ignore_generation;
63 called when we should retry the operation
65 static void lock_fetch_callback(void *p)
67 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68 if (!state->ignore_generation &&
69 state->generation != state->ctdb->vnn_map->generation) {
70 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71 talloc_free(state->hdr);
74 state->recv_pkt(state->recv_context, state->hdr);
75 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
80 do a non-blocking ltdb_lock, deferring this ctdb request until we
83 It does the following:
85 1) tries to get the chainlock. If it succeeds, then it returns 0
87 2) if it fails to get a chainlock immediately then it sets up a
88 non-blocking chainlock via ctdb_lockwait, and when it gets the
89 chainlock it re-submits this ctdb request to the main packet
92 This effectively queues all ctdb requests that cannot be
93 immediately satisfied until it can get the lock. This means that
94 the main ctdb daemon will not block waiting for a chainlock held by
97 There are 3 possible return values:
99 0: means that it got the lock immediately.
100 -1: means that it failed to get the lock, and won't retry
101 -2: means that it failed to get the lock immediately, but will retry
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
104 TDB_DATA key, struct ctdb_req_header *hdr,
105 void (*recv_pkt)(void *, struct ctdb_req_header *),
106 void *recv_context, bool ignore_generation)
109 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110 struct lockwait_handle *h;
111 struct lock_fetch_state *state;
113 ret = tdb_chainlock_nonblock(tdb, key);
116 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117 /* a hard failure - don't try again */
121 /* when torturing, ensure we test the contended path */
122 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
125 tdb_chainunlock(tdb, key);
128 /* first the non-contended path */
133 state = talloc(hdr, struct lock_fetch_state);
134 state->ctdb = ctdb_db->ctdb;
136 state->recv_pkt = recv_pkt;
137 state->recv_context = recv_context;
138 state->generation = ctdb_db->ctdb->vnn_map->generation;
139 state->ignore_generation = ignore_generation;
141 /* now the contended path */
142 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
147 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148 so it won't be freed yet */
149 talloc_steal(state, hdr);
150 talloc_steal(state, h);
152 /* now tell the caller than we will retry asynchronously */
157 a varient of ctdb_ltdb_lock_requeue that also fetches the record
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
160 TDB_DATA key, struct ctdb_ltdb_header *header,
161 struct ctdb_req_header *hdr, TDB_DATA *data,
162 void (*recv_pkt)(void *, struct ctdb_req_header *),
163 void *recv_context, bool ignore_generation)
167 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
168 recv_context, ignore_generation);
170 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
173 uret = ctdb_ltdb_unlock(ctdb_db, key);
175 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
184 paraoid check to see if the db is empty
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
188 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189 int count = tdb_traverse_read(tdb, NULL, NULL);
191 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
193 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198 struct ctdb_db_context *ctdb_db)
200 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
206 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207 key.dsize = strlen(ctdb_db->db_name);
209 old = ctdb_db->unhealthy_reason;
210 ctdb_db->unhealthy_reason = NULL;
212 val = tdb_fetch(tdb, key);
214 reason = talloc_strndup(ctdb_db,
215 (const char *)val.dptr,
217 if (reason == NULL) {
218 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
220 ctdb_db->unhealthy_reason = old;
231 ctdb_db->unhealthy_reason = reason;
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236 struct ctdb_db_context *ctdb_db,
237 const char *given_reason,/* NULL means healthy */
238 int num_healthy_nodes)
240 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
244 char *new_reason = NULL;
245 char *old_reason = NULL;
247 ret = tdb_transaction_start(tdb);
249 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250 tdb_name(tdb), ret, tdb_errorstr(tdb)));
254 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
256 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257 ctdb_db->db_name, ret));
260 old_reason = ctdb_db->unhealthy_reason;
262 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263 key.dsize = strlen(ctdb_db->db_name);
266 new_reason = talloc_strdup(ctdb_db, given_reason);
267 if (new_reason == NULL) {
268 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
272 } else if (old_reason && num_healthy_nodes == 0) {
274 * If the reason indicates ok, but there where no healthy nodes
275 * available, that it means, we have not recovered valid content
276 * of the db. So if there's an old reason, prefix it with
277 * "NO-HEALTHY-NODES - "
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
284 prefix = _TMP_PREFIX;
288 new_reason = talloc_asprintf(ctdb_db, "%s%s",
290 if (new_reason == NULL) {
291 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292 prefix, old_reason));
299 val.dptr = discard_const_p(uint8_t, new_reason);
300 val.dsize = strlen(new_reason);
302 ret = tdb_store(tdb, key, val, TDB_REPLACE);
304 tdb_transaction_cancel(tdb);
305 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306 tdb_name(tdb), ctdb_db->db_name, new_reason,
307 ret, tdb_errorstr(tdb)));
308 talloc_free(new_reason);
311 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312 ctdb_db->db_name, new_reason));
313 } else if (old_reason) {
314 ret = tdb_delete(tdb, key);
316 tdb_transaction_cancel(tdb);
317 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318 tdb_name(tdb), ctdb_db->db_name,
319 ret, tdb_errorstr(tdb)));
320 talloc_free(new_reason);
323 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
327 ret = tdb_transaction_commit(tdb);
328 if (ret != TDB_SUCCESS) {
329 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330 tdb_name(tdb), ret, tdb_errorstr(tdb)));
331 talloc_free(new_reason);
335 talloc_free(old_reason);
336 ctdb_db->unhealthy_reason = new_reason;
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342 struct ctdb_db_context *ctdb_db)
344 time_t now = time(NULL);
352 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354 "%04u%02u%02u%02u%02u%02u.0Z",
356 tm->tm_year+1900, tm->tm_mon+1,
357 tm->tm_mday, tm->tm_hour, tm->tm_min,
359 if (new_path == NULL) {
360 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
364 new_reason = talloc_asprintf(ctdb_db,
365 "ERROR - Backup of corrupted TDB in '%s'",
367 if (new_reason == NULL) {
368 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
371 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372 talloc_free(new_reason);
374 DEBUG(DEBUG_CRIT,(__location__
375 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
380 ret = rename(ctdb_db->db_path, new_path);
382 DEBUG(DEBUG_CRIT,(__location__
383 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384 ctdb_db->db_path, new_path,
385 errno, strerror(errno)));
386 talloc_free(new_path);
390 DEBUG(DEBUG_CRIT,(__location__
391 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392 ctdb_db->db_path, new_path));
393 talloc_free(new_path);
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
399 struct ctdb_db_context *ctdb_db;
404 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405 if (!ctdb_db->persistent) {
409 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
411 DEBUG(DEBUG_ALERT,(__location__
412 " load persistent health for '%s' failed\n",
417 if (ctdb_db->unhealthy_reason == NULL) {
419 DEBUG(DEBUG_INFO,(__location__
420 " persistent db '%s' healthy\n",
426 DEBUG(DEBUG_ALERT,(__location__
427 " persistent db '%s' unhealthy: %s\n",
429 ctdb_db->unhealthy_reason));
431 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
444 mark a database - as healthy
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
448 uint32_t db_id = *(uint32_t *)indata.dptr;
449 struct ctdb_db_context *ctdb_db;
451 bool may_recover = false;
453 ctdb_db = find_ctdb_db(ctdb, db_id);
455 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
459 if (ctdb_db->unhealthy_reason) {
463 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
465 DEBUG(DEBUG_ERR,(__location__
466 " ctdb_update_persistent_health(%s) failed\n",
471 if (may_recover && !ctdb->done_startup) {
472 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
474 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
484 uint32_t db_id = *(uint32_t *)indata.dptr;
485 struct ctdb_db_context *ctdb_db;
488 ctdb_db = find_ctdb_db(ctdb, db_id);
490 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
494 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
496 DEBUG(DEBUG_ERR,(__location__
497 " ctdb_load_persistent_health(%s) failed\n",
503 if (ctdb_db->unhealthy_reason) {
504 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
512 attach to a database, handling both persistent and non-persistent databases
513 return 0 on success, -1 on failure
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516 bool persistent, const char *unhealthy_reason)
518 struct ctdb_db_context *ctdb_db, *tmp_db;
523 int remaining_tries = 0;
525 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
526 CTDB_NO_MEMORY(ctdb, ctdb_db);
528 ctdb_db->priority = 1;
529 ctdb_db->ctdb = ctdb;
530 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
531 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
533 key.dsize = strlen(db_name)+1;
534 key.dptr = discard_const(db_name);
535 ctdb_db->db_id = ctdb_hash(&key);
536 ctdb_db->persistent = persistent;
538 /* check for hash collisions */
539 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
540 if (tmp_db->db_id == ctdb_db->db_id) {
541 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
542 tmp_db->db_id, db_name, tmp_db->db_name));
543 talloc_free(ctdb_db);
549 if (unhealthy_reason) {
550 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
551 unhealthy_reason, 0);
553 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
554 ctdb_db->db_name, unhealthy_reason, ret));
555 talloc_free(ctdb_db);
560 if (ctdb->max_persistent_check_errors > 0) {
563 if (ctdb->done_startup) {
567 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
569 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
570 ctdb_db->db_name, ret));
571 talloc_free(ctdb_db);
576 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
577 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
578 ctdb_db->db_name, ctdb_db->unhealthy_reason));
579 talloc_free(ctdb_db);
583 if (ctdb_db->unhealthy_reason) {
584 /* this is just a warning, but we want that in the log file! */
585 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
586 ctdb_db->db_name, ctdb_db->unhealthy_reason));
589 /* open the database */
590 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
591 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
594 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
595 if (ctdb->valgrinding) {
596 tdb_flags |= TDB_NOMMAP;
598 tdb_flags |= TDB_DISALLOW_NESTING;
601 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
602 ctdb->tunable.database_hash_size,
604 O_CREAT|O_RDWR, mode);
605 if (ctdb_db->ltdb == NULL) {
607 int saved_errno = errno;
610 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
613 strerror(saved_errno)));
614 talloc_free(ctdb_db);
618 if (remaining_tries == 0) {
619 DEBUG(DEBUG_CRIT,(__location__
620 "Failed to open persistent tdb '%s': %d - %s\n",
623 strerror(saved_errno)));
624 talloc_free(ctdb_db);
628 ret = stat(ctdb_db->db_path, &st);
630 DEBUG(DEBUG_CRIT,(__location__
631 "Failed to open persistent tdb '%s': %d - %s\n",
634 strerror(saved_errno)));
635 talloc_free(ctdb_db);
639 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
641 DEBUG(DEBUG_CRIT,(__location__
642 "Failed to open persistent tdb '%s': %d - %s\n",
645 strerror(saved_errno)));
646 talloc_free(ctdb_db);
656 ctdb_check_db_empty(ctdb_db);
658 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
663 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
664 ctdb_db->db_path, ret,
665 tdb_errorstr(ctdb_db->ltdb->tdb)));
666 if (remaining_tries == 0) {
667 talloc_free(ctdb_db);
671 fd = tdb_fd(ctdb_db->ltdb->tdb);
672 ret = fstat(fd, &st);
674 DEBUG(DEBUG_CRIT,(__location__
675 "Failed to fstat() persistent tdb '%s': %d - %s\n",
679 talloc_free(ctdb_db);
684 talloc_free(ctdb_db->ltdb);
685 ctdb_db->ltdb = NULL;
687 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
689 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
691 talloc_free(ctdb_db);
701 DLIST_ADD(ctdb->db_list, ctdb_db);
703 /* setting this can help some high churn databases */
704 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
707 all databases support the "null" function. we need this in
708 order to do forced migration of records
710 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
712 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
713 talloc_free(ctdb_db);
718 all databases support the "fetch" function. we need this
719 for efficient Samba3 ctdb fetch
721 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
723 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
724 talloc_free(ctdb_db);
728 ret = ctdb_vacuum_init(ctdb_db);
730 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
731 "database '%s'\n", ctdb_db->db_name));
732 talloc_free(ctdb_db);
737 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
745 a client has asked to attach a new database
747 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
748 TDB_DATA *outdata, uint64_t tdb_flags,
751 const char *db_name = (const char *)indata.dptr;
752 struct ctdb_db_context *db;
753 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
755 /* the client can optionally pass additional tdb flags, but we
756 only allow a subset of those on the database in ctdb. Note
757 that tdb_flags is passed in via the (otherwise unused)
758 srvid to the attach control */
759 tdb_flags &= TDB_NOSYNC;
761 /* If the node is inactive it is not part of the cluster
762 and we should not allow clients to attach to any
765 if (node->flags & NODE_FLAGS_INACTIVE) {
766 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
771 /* see if we already have this name */
772 db = ctdb_db_handle(ctdb, db_name);
774 outdata->dptr = (uint8_t *)&db->db_id;
775 outdata->dsize = sizeof(db->db_id);
776 tdb_add_flags(db->ltdb->tdb, tdb_flags);
780 if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
784 db = ctdb_db_handle(ctdb, db_name);
786 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
790 /* remember the flags the client has specified */
791 tdb_add_flags(db->ltdb->tdb, tdb_flags);
793 outdata->dptr = (uint8_t *)&db->db_id;
794 outdata->dsize = sizeof(db->db_id);
796 /* Try to ensure it's locked in mem */
797 ctdb_lockdown_memory(ctdb);
799 /* tell all the other nodes about this database */
800 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
801 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
802 CTDB_CONTROL_DB_ATTACH,
803 0, CTDB_CTRL_FLAG_NOREPLY,
812 attach to all existing persistent databases
814 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
815 const char *unhealthy_reason)
820 /* open the persistent db directory and scan it for files */
821 d = opendir(ctdb->db_directory_persistent);
826 while ((de=readdir(d))) {
828 size_t len = strlen(de->d_name);
830 int invalid_name = 0;
832 s = talloc_strdup(ctdb, de->d_name);
833 CTDB_NO_MEMORY(ctdb, s);
835 /* only accept names ending in .tdb */
836 p = strstr(s, ".tdb.");
837 if (len < 7 || p == NULL) {
842 /* only accept names ending with .tdb. and any number of digits */
844 while (*q != 0 && invalid_name == 0) {
845 if (!isdigit(*q++)) {
849 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
850 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
856 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
857 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
863 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
871 int ctdb_attach_databases(struct ctdb_context *ctdb)
874 char *persistent_health_path = NULL;
875 char *unhealthy_reason = NULL;
876 bool first_try = true;
878 if (ctdb->db_directory == NULL) {
879 ctdb->db_directory = VARDIR "/ctdb";
881 if (ctdb->db_directory_persistent == NULL) {
882 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
884 if (ctdb->db_directory_state == NULL) {
885 ctdb->db_directory_state = VARDIR "/ctdb/state";
888 /* make sure the db directory exists */
889 ret = mkdir(ctdb->db_directory, 0700);
890 if (ret == -1 && errno != EEXIST) {
891 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
892 ctdb->db_directory));
896 /* make sure the persistent db directory exists */
897 ret = mkdir(ctdb->db_directory_persistent, 0700);
898 if (ret == -1 && errno != EEXIST) {
899 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
900 ctdb->db_directory_persistent));
904 /* make sure the internal state db directory exists */
905 ret = mkdir(ctdb->db_directory_state, 0700);
906 if (ret == -1 && errno != EEXIST) {
907 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
908 ctdb->db_directory_state));
912 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
913 ctdb->db_directory_state,
914 PERSISTENT_HEALTH_TDB,
916 if (persistent_health_path == NULL) {
917 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
923 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
924 0, TDB_DISALLOW_NESTING,
925 O_CREAT | O_RDWR, 0600);
926 if (ctdb->db_persistent_health == NULL) {
927 struct tdb_wrap *tdb;
930 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
931 persistent_health_path,
934 talloc_free(persistent_health_path);
935 talloc_free(unhealthy_reason);
940 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
941 persistent_health_path,
942 "was cleared after a failure",
943 "manual verification needed");
944 if (unhealthy_reason == NULL) {
945 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
946 talloc_free(persistent_health_path);
950 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
951 persistent_health_path));
952 tdb = tdb_wrap_open(ctdb, persistent_health_path,
953 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
954 O_CREAT | O_RDWR, 0600);
956 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
957 persistent_health_path,
960 talloc_free(persistent_health_path);
961 talloc_free(unhealthy_reason);
968 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
970 struct tdb_wrap *tdb;
972 talloc_free(ctdb->db_persistent_health);
973 ctdb->db_persistent_health = NULL;
976 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
977 persistent_health_path));
978 talloc_free(persistent_health_path);
979 talloc_free(unhealthy_reason);
984 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
985 persistent_health_path,
986 "was cleared after a failure",
987 "manual verification needed");
988 if (unhealthy_reason == NULL) {
989 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
990 talloc_free(persistent_health_path);
994 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
995 persistent_health_path));
996 tdb = tdb_wrap_open(ctdb, persistent_health_path,
997 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
998 O_CREAT | O_RDWR, 0600);
1000 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1001 persistent_health_path,
1004 talloc_free(persistent_health_path);
1005 talloc_free(unhealthy_reason);
1012 talloc_free(persistent_health_path);
1014 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1015 talloc_free(unhealthy_reason);
1024 called when a broadcast seqnum update comes in
1026 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1028 struct ctdb_db_context *ctdb_db;
1029 if (srcnode == ctdb->pnn) {
1030 /* don't update ourselves! */
1034 ctdb_db = find_ctdb_db(ctdb, db_id);
1036 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1040 if (ctdb_db->unhealthy_reason) {
1041 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1042 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1046 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1047 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1052 timer to check for seqnum changes in a ltdb and propogate them
1054 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1055 struct timeval t, void *p)
1057 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1058 struct ctdb_context *ctdb = ctdb_db->ctdb;
1059 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1060 if (new_seqnum != ctdb_db->seqnum) {
1061 /* something has changed - propogate it */
1063 data.dptr = (uint8_t *)&ctdb_db->db_id;
1064 data.dsize = sizeof(uint32_t);
1065 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1066 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1069 ctdb_db->seqnum = new_seqnum;
1071 /* setup a new timer */
1072 ctdb_db->seqnum_update =
1073 event_add_timed(ctdb->ev, ctdb_db,
1074 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1075 ctdb_ltdb_seqnum_check, ctdb_db);
1079 enable seqnum handling on this db
1081 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1083 struct ctdb_db_context *ctdb_db;
1084 ctdb_db = find_ctdb_db(ctdb, db_id);
1086 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1090 if (ctdb_db->seqnum_update == NULL) {
1091 ctdb_db->seqnum_update =
1092 event_add_timed(ctdb->ev, ctdb_db,
1093 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1094 ctdb_ltdb_seqnum_check, ctdb_db);
1097 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1098 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1102 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1104 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1105 struct ctdb_db_context *ctdb_db;
1107 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1109 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1113 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1114 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1118 ctdb_db->priority = db_prio->priority;
1119 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));