2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 this is the dummy null procedure that all databases support
37 static int ctdb_null_func(struct ctdb_call_info *call)
43 this is a plain fetch procedure that all databases support
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 call->reply_data = &call->record_data;
53 struct lock_fetch_state {
54 struct ctdb_context *ctdb;
55 void (*recv_pkt)(void *, struct ctdb_req_header *);
57 struct ctdb_req_header *hdr;
59 bool ignore_generation;
63 called when we should retry the operation
65 static void lock_fetch_callback(void *p)
67 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68 if (!state->ignore_generation &&
69 state->generation != state->ctdb->vnn_map->generation) {
70 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71 talloc_free(state->hdr);
74 state->recv_pkt(state->recv_context, state->hdr);
75 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
80 do a non-blocking ltdb_lock, deferring this ctdb request until we
83 It does the following:
85 1) tries to get the chainlock. If it succeeds, then it returns 0
87 2) if it fails to get a chainlock immediately then it sets up a
88 non-blocking chainlock via ctdb_lockwait, and when it gets the
89 chainlock it re-submits this ctdb request to the main packet
92 This effectively queues all ctdb requests that cannot be
93 immediately satisfied until it can get the lock. This means that
94 the main ctdb daemon will not block waiting for a chainlock held by
97 There are 3 possible return values:
99 0: means that it got the lock immediately.
100 -1: means that it failed to get the lock, and won't retry
101 -2: means that it failed to get the lock immediately, but will retry
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
104 TDB_DATA key, struct ctdb_req_header *hdr,
105 void (*recv_pkt)(void *, struct ctdb_req_header *),
106 void *recv_context, bool ignore_generation)
109 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110 struct lockwait_handle *h;
111 struct lock_fetch_state *state;
113 ret = tdb_chainlock_nonblock(tdb, key);
116 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117 /* a hard failure - don't try again */
121 /* when torturing, ensure we test the contended path */
122 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
125 tdb_chainunlock(tdb, key);
128 /* first the non-contended path */
133 state = talloc(hdr, struct lock_fetch_state);
134 state->ctdb = ctdb_db->ctdb;
136 state->recv_pkt = recv_pkt;
137 state->recv_context = recv_context;
138 state->generation = ctdb_db->ctdb->vnn_map->generation;
139 state->ignore_generation = ignore_generation;
141 /* now the contended path */
142 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
144 tdb_chainunlock(tdb, key);
148 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
149 so it won't be freed yet */
150 talloc_steal(state, hdr);
151 talloc_steal(state, h);
153 /* now tell the caller than we will retry asynchronously */
158 a varient of ctdb_ltdb_lock_requeue that also fetches the record
160 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
161 TDB_DATA key, struct ctdb_ltdb_header *header,
162 struct ctdb_req_header *hdr, TDB_DATA *data,
163 void (*recv_pkt)(void *, struct ctdb_req_header *),
164 void *recv_context, bool ignore_generation)
168 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
169 recv_context, ignore_generation);
171 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
173 ctdb_ltdb_unlock(ctdb_db, key);
181 paraoid check to see if the db is empty
183 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
185 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
186 int count = tdb_traverse_read(tdb, NULL, NULL);
188 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
190 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
194 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
195 struct ctdb_db_context *ctdb_db)
197 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
203 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
204 key.dsize = strlen(ctdb_db->db_name);
206 old = ctdb_db->unhealthy_reason;
207 ctdb_db->unhealthy_reason = NULL;
209 val = tdb_fetch(tdb, key);
211 reason = talloc_strndup(ctdb_db,
212 (const char *)val.dptr,
214 if (reason == NULL) {
215 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
217 ctdb_db->unhealthy_reason = old;
228 ctdb_db->unhealthy_reason = reason;
232 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
233 struct ctdb_db_context *ctdb_db,
234 const char *given_reason,/* NULL means healthy */
235 int num_healthy_nodes)
237 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
241 char *new_reason = NULL;
242 char *old_reason = NULL;
244 ret = tdb_transaction_start(tdb);
246 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
247 tdb_name(tdb), ret, tdb_errorstr(tdb)));
251 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
253 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
254 ctdb_db->db_name, ret));
257 old_reason = ctdb_db->unhealthy_reason;
259 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
260 key.dsize = strlen(ctdb_db->db_name);
263 new_reason = talloc_strdup(ctdb_db, given_reason);
264 if (new_reason == NULL) {
265 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
269 } else if (old_reason && num_healthy_nodes == 0) {
271 * If the reason indicates ok, but there where no healthy nodes
272 * available, that it means, we have not recovered valid content
273 * of the db. So if there's an old reason, prefix it with
274 * "NO-HEALTHY-NODES - "
278 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
279 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
281 prefix = _TMP_PREFIX;
285 new_reason = talloc_asprintf(ctdb_db, "%s%s",
287 if (new_reason == NULL) {
288 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
289 prefix, old_reason));
296 val.dptr = discard_const_p(uint8_t, new_reason);
297 val.dsize = strlen(new_reason);
299 ret = tdb_store(tdb, key, val, TDB_REPLACE);
301 tdb_transaction_cancel(tdb);
302 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
303 tdb_name(tdb), ctdb_db->db_name, new_reason,
304 ret, tdb_errorstr(tdb)));
305 talloc_free(new_reason);
308 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
309 ctdb_db->db_name, new_reason));
310 } else if (old_reason) {
311 ret = tdb_delete(tdb, key);
313 tdb_transaction_cancel(tdb);
314 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
315 tdb_name(tdb), ctdb_db->db_name,
316 ret, tdb_errorstr(tdb)));
317 talloc_free(new_reason);
320 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
324 ret = tdb_transaction_commit(tdb);
325 if (ret != TDB_SUCCESS) {
326 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
327 tdb_name(tdb), ret, tdb_errorstr(tdb)));
328 talloc_free(new_reason);
332 talloc_free(old_reason);
333 ctdb_db->unhealthy_reason = new_reason;
338 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
339 struct ctdb_db_context *ctdb_db)
341 time_t now = time(NULL);
349 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
350 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
351 "%04u%02u%02u%02u%02u%02u.0Z",
353 tm->tm_year+1900, tm->tm_mon+1,
354 tm->tm_mday, tm->tm_hour, tm->tm_min,
356 if (new_path == NULL) {
357 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
361 new_reason = talloc_asprintf(ctdb_db,
362 "ERROR - Backup of corrupted TDB in '%s'",
364 if (new_reason == NULL) {
365 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
368 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
369 talloc_free(new_reason);
371 DEBUG(DEBUG_CRIT,(__location__
372 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
377 ret = rename(ctdb_db->db_path, new_path);
379 DEBUG(DEBUG_CRIT,(__location__
380 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
381 ctdb_db->db_path, new_path,
382 errno, strerror(errno)));
383 talloc_free(new_path);
387 DEBUG(DEBUG_CRIT,(__location__
388 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
389 ctdb_db->db_path, new_path));
390 talloc_free(new_path);
394 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
396 struct ctdb_db_context *ctdb_db;
401 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
402 if (!ctdb_db->persistent) {
406 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
408 DEBUG(DEBUG_ALERT,(__location__
409 " load persistent health for '%s' failed\n",
414 if (ctdb_db->unhealthy_reason == NULL) {
416 DEBUG(DEBUG_INFO,(__location__
417 " persistent db '%s' healthy\n",
423 DEBUG(DEBUG_ALERT,(__location__
424 " persistent db '%s' unhealthy: %s\n",
426 ctdb_db->unhealthy_reason));
428 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
429 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
441 mark a database - as healthy
443 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
445 uint32_t db_id = *(uint32_t *)indata.dptr;
446 struct ctdb_db_context *ctdb_db;
448 bool may_recover = false;
450 ctdb_db = find_ctdb_db(ctdb, db_id);
452 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
456 if (ctdb_db->unhealthy_reason) {
460 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
462 DEBUG(DEBUG_ERR,(__location__
463 " ctdb_update_persistent_health(%s) failed\n",
468 if (may_recover && !ctdb->done_startup) {
469 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
471 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
477 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
481 uint32_t db_id = *(uint32_t *)indata.dptr;
482 struct ctdb_db_context *ctdb_db;
485 ctdb_db = find_ctdb_db(ctdb, db_id);
487 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
491 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
493 DEBUG(DEBUG_ERR,(__location__
494 " ctdb_load_persistent_health(%s) failed\n",
500 if (ctdb_db->unhealthy_reason) {
501 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
502 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
509 attach to a database, handling both persistent and non-persistent databases
510 return 0 on success, -1 on failure
512 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
513 bool persistent, const char *unhealthy_reason)
515 struct ctdb_db_context *ctdb_db, *tmp_db;
520 int remaining_tries = 0;
522 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
523 CTDB_NO_MEMORY(ctdb, ctdb_db);
525 ctdb_db->priority = 1;
526 ctdb_db->ctdb = ctdb;
527 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
528 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
530 key.dsize = strlen(db_name)+1;
531 key.dptr = discard_const(db_name);
532 ctdb_db->db_id = ctdb_hash(&key);
533 ctdb_db->persistent = persistent;
535 /* check for hash collisions */
536 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
537 if (tmp_db->db_id == ctdb_db->db_id) {
538 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
539 tmp_db->db_id, db_name, tmp_db->db_name));
540 talloc_free(ctdb_db);
546 if (unhealthy_reason) {
547 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
548 unhealthy_reason, 0);
550 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
551 ctdb_db->db_name, unhealthy_reason, ret));
552 talloc_free(ctdb_db);
557 if (ctdb->max_persistent_check_errors > 0) {
560 if (ctdb->done_startup) {
564 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
566 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
567 ctdb_db->db_name, ret));
568 talloc_free(ctdb_db);
573 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
574 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
575 ctdb_db->db_name, ctdb_db->unhealthy_reason));
576 talloc_free(ctdb_db);
580 if (ctdb_db->unhealthy_reason) {
581 /* this is just a warning, but we want that in the log file! */
582 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
583 ctdb_db->db_name, ctdb_db->unhealthy_reason));
586 /* open the database */
587 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
588 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
591 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
592 if (!ctdb->do_setsched) {
593 tdb_flags |= TDB_NOMMAP;
595 tdb_flags |= TDB_DISALLOW_NESTING;
598 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
599 ctdb->tunable.database_hash_size,
601 O_CREAT|O_RDWR, mode);
602 if (ctdb_db->ltdb == NULL) {
604 int saved_errno = errno;
607 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
610 strerror(saved_errno)));
611 talloc_free(ctdb_db);
615 if (remaining_tries == 0) {
616 DEBUG(DEBUG_CRIT,(__location__
617 "Failed to open persistent tdb '%s': %d - %s\n",
620 strerror(saved_errno)));
621 talloc_free(ctdb_db);
625 ret = stat(ctdb_db->db_path, &st);
627 DEBUG(DEBUG_CRIT,(__location__
628 "Failed to open persistent tdb '%s': %d - %s\n",
631 strerror(saved_errno)));
632 talloc_free(ctdb_db);
636 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
638 DEBUG(DEBUG_CRIT,(__location__
639 "Failed to open persistent tdb '%s': %d - %s\n",
642 strerror(saved_errno)));
643 talloc_free(ctdb_db);
653 ctdb_check_db_empty(ctdb_db);
655 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
660 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
661 ctdb_db->db_path, ret,
662 tdb_errorstr(ctdb_db->ltdb->tdb)));
663 if (remaining_tries == 0) {
664 talloc_free(ctdb_db);
668 fd = tdb_fd(ctdb_db->ltdb->tdb);
669 ret = fstat(fd, &st);
671 DEBUG(DEBUG_CRIT,(__location__
672 "Failed to fstat() persistent tdb '%s': %d - %s\n",
676 talloc_free(ctdb_db);
681 talloc_free(ctdb_db->ltdb);
682 ctdb_db->ltdb = NULL;
684 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
686 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
688 talloc_free(ctdb_db);
698 DLIST_ADD(ctdb->db_list, ctdb_db);
700 /* setting this can help some high churn databases */
701 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
704 all databases support the "null" function. we need this in
705 order to do forced migration of records
707 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
709 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
710 talloc_free(ctdb_db);
715 all databases support the "fetch" function. we need this
716 for efficient Samba3 ctdb fetch
718 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
720 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
721 talloc_free(ctdb_db);
725 ret = ctdb_vacuum_init(ctdb_db);
727 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
728 "database '%s'\n", ctdb_db->db_name));
729 talloc_free(ctdb_db);
734 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
742 a client has asked to attach a new database
744 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
745 TDB_DATA *outdata, uint64_t tdb_flags,
748 const char *db_name = (const char *)indata.dptr;
749 struct ctdb_db_context *db;
750 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
752 /* the client can optionally pass additional tdb flags, but we
753 only allow a subset of those on the database in ctdb. Note
754 that tdb_flags is passed in via the (otherwise unused)
755 srvid to the attach control */
756 tdb_flags &= TDB_NOSYNC;
758 /* If the node is inactive it is not part of the cluster
759 and we should not allow clients to attach to any
762 if (node->flags & NODE_FLAGS_INACTIVE) {
763 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
768 /* see if we already have this name */
769 db = ctdb_db_handle(ctdb, db_name);
771 outdata->dptr = (uint8_t *)&db->db_id;
772 outdata->dsize = sizeof(db->db_id);
773 tdb_add_flags(db->ltdb->tdb, tdb_flags);
777 if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
781 db = ctdb_db_handle(ctdb, db_name);
783 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
787 /* remember the flags the client has specified */
788 tdb_add_flags(db->ltdb->tdb, tdb_flags);
790 outdata->dptr = (uint8_t *)&db->db_id;
791 outdata->dsize = sizeof(db->db_id);
793 /* tell all the other nodes about this database */
794 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
795 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
796 CTDB_CONTROL_DB_ATTACH,
797 0, CTDB_CTRL_FLAG_NOREPLY,
806 attach to all existing persistent databases
808 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
809 const char *unhealthy_reason)
814 /* open the persistent db directory and scan it for files */
815 d = opendir(ctdb->db_directory_persistent);
820 while ((de=readdir(d))) {
822 size_t len = strlen(de->d_name);
824 int invalid_name = 0;
826 s = talloc_strdup(ctdb, de->d_name);
827 CTDB_NO_MEMORY(ctdb, s);
829 /* only accept names ending in .tdb */
830 p = strstr(s, ".tdb.");
831 if (len < 7 || p == NULL) {
836 /* only accept names ending with .tdb. and any number of digits */
838 while (*q != 0 && invalid_name == 0) {
839 if (!isdigit(*q++)) {
843 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
844 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
850 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
851 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
857 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
865 int ctdb_attach_databases(struct ctdb_context *ctdb)
868 char *persistent_health_path = NULL;
869 char *unhealthy_reason = NULL;
870 bool first_try = true;
872 if (ctdb->db_directory == NULL) {
873 ctdb->db_directory = VARDIR "/ctdb";
875 if (ctdb->db_directory_persistent == NULL) {
876 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
878 if (ctdb->db_directory_state == NULL) {
879 ctdb->db_directory_state = VARDIR "/ctdb/state";
882 /* make sure the db directory exists */
883 ret = mkdir(ctdb->db_directory, 0700);
884 if (ret == -1 && errno != EEXIST) {
885 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
886 ctdb->db_directory));
890 /* make sure the persistent db directory exists */
891 ret = mkdir(ctdb->db_directory_persistent, 0700);
892 if (ret == -1 && errno != EEXIST) {
893 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
894 ctdb->db_directory_persistent));
898 /* make sure the internal state db directory exists */
899 ret = mkdir(ctdb->db_directory_state, 0700);
900 if (ret == -1 && errno != EEXIST) {
901 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
902 ctdb->db_directory_state));
906 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
907 ctdb->db_directory_state,
908 PERSISTENT_HEALTH_TDB,
910 if (persistent_health_path == NULL) {
911 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
917 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
918 0, TDB_DISALLOW_NESTING,
919 O_CREAT | O_RDWR, 0600);
920 if (ctdb->db_persistent_health == NULL) {
921 struct tdb_wrap *tdb;
924 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
925 persistent_health_path,
928 talloc_free(persistent_health_path);
929 talloc_free(unhealthy_reason);
934 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
935 persistent_health_path,
936 "was cleared after a failure",
937 "manual verification needed");
938 if (unhealthy_reason == NULL) {
939 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
940 talloc_free(persistent_health_path);
944 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
945 persistent_health_path));
946 tdb = tdb_wrap_open(ctdb, persistent_health_path,
947 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
948 O_CREAT | O_RDWR, 0600);
950 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
951 persistent_health_path,
954 talloc_free(persistent_health_path);
955 talloc_free(unhealthy_reason);
962 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
964 struct tdb_wrap *tdb;
966 talloc_free(ctdb->db_persistent_health);
967 ctdb->db_persistent_health = NULL;
970 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
971 persistent_health_path));
972 talloc_free(persistent_health_path);
973 talloc_free(unhealthy_reason);
978 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
979 persistent_health_path,
980 "was cleared after a failure",
981 "manual verification needed");
982 if (unhealthy_reason == NULL) {
983 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
984 talloc_free(persistent_health_path);
988 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
989 persistent_health_path));
990 tdb = tdb_wrap_open(ctdb, persistent_health_path,
991 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
992 O_CREAT | O_RDWR, 0600);
994 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
995 persistent_health_path,
998 talloc_free(persistent_health_path);
999 talloc_free(unhealthy_reason);
1006 talloc_free(persistent_health_path);
1008 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1009 talloc_free(unhealthy_reason);
1018 called when a broadcast seqnum update comes in
1020 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1022 struct ctdb_db_context *ctdb_db;
1023 if (srcnode == ctdb->pnn) {
1024 /* don't update ourselves! */
1028 ctdb_db = find_ctdb_db(ctdb, db_id);
1030 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1034 if (ctdb_db->unhealthy_reason) {
1035 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1036 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1040 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1041 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1046 timer to check for seqnum changes in a ltdb and propogate them
1048 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1049 struct timeval t, void *p)
1051 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1052 struct ctdb_context *ctdb = ctdb_db->ctdb;
1053 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1054 if (new_seqnum != ctdb_db->seqnum) {
1055 /* something has changed - propogate it */
1057 data.dptr = (uint8_t *)&ctdb_db->db_id;
1058 data.dsize = sizeof(uint32_t);
1059 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1060 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1063 ctdb_db->seqnum = new_seqnum;
1065 /* setup a new timer */
1066 ctdb_db->seqnum_update =
1067 event_add_timed(ctdb->ev, ctdb_db,
1068 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1069 ctdb_ltdb_seqnum_check, ctdb_db);
1073 enable seqnum handling on this db
1075 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1077 struct ctdb_db_context *ctdb_db;
1078 ctdb_db = find_ctdb_db(ctdb, db_id);
1080 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1084 if (ctdb_db->seqnum_update == NULL) {
1085 ctdb_db->seqnum_update =
1086 event_add_timed(ctdb->ev, ctdb_db,
1087 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1088 ctdb_ltdb_seqnum_check, ctdb_db);
1091 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1092 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1096 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1098 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1099 struct ctdb_db_context *ctdb_db;
1101 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1103 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1107 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1108 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1112 ctdb_db->priority = db_prio->priority;
1113 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));