2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
31 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34 this is the dummy null procedure that all databases support
36 static int ctdb_null_func(struct ctdb_call_info *call)
42 this is a plain fetch procedure that all databases support
44 static int ctdb_fetch_func(struct ctdb_call_info *call)
46 call->reply_data = &call->record_data;
52 struct lock_fetch_state {
53 struct ctdb_context *ctdb;
54 void (*recv_pkt)(void *, struct ctdb_req_header *);
56 struct ctdb_req_header *hdr;
58 bool ignore_generation;
62 called when we should retry the operation
64 static void lock_fetch_callback(void *p)
66 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
67 if (!state->ignore_generation &&
68 state->generation != state->ctdb->vnn_map->generation) {
69 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
70 talloc_free(state->hdr);
73 state->recv_pkt(state->recv_context, state->hdr);
74 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
79 do a non-blocking ltdb_lock, deferring this ctdb request until we
82 It does the following:
84 1) tries to get the chainlock. If it succeeds, then it returns 0
86 2) if it fails to get a chainlock immediately then it sets up a
87 non-blocking chainlock via ctdb_lockwait, and when it gets the
88 chainlock it re-submits this ctdb request to the main packet
91 This effectively queues all ctdb requests that cannot be
92 immediately satisfied until it can get the lock. This means that
93 the main ctdb daemon will not block waiting for a chainlock held by
96 There are 3 possible return values:
98 0: means that it got the lock immediately.
99 -1: means that it failed to get the lock, and won't retry
100 -2: means that it failed to get the lock immediately, but will retry
102 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
103 TDB_DATA key, struct ctdb_req_header *hdr,
104 void (*recv_pkt)(void *, struct ctdb_req_header *),
105 void *recv_context, bool ignore_generation)
108 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
109 struct lockwait_handle *h;
110 struct lock_fetch_state *state;
112 ret = tdb_chainlock_nonblock(tdb, key);
115 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
116 /* a hard failure - don't try again */
120 /* when torturing, ensure we test the contended path */
121 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
124 tdb_chainunlock(tdb, key);
127 /* first the non-contended path */
132 state = talloc(hdr, struct lock_fetch_state);
133 state->ctdb = ctdb_db->ctdb;
135 state->recv_pkt = recv_pkt;
136 state->recv_context = recv_context;
137 state->generation = ctdb_db->ctdb->vnn_map->generation;
138 state->ignore_generation = ignore_generation;
140 /* now the contended path */
141 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
143 tdb_chainunlock(tdb, key);
147 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148 so it won't be freed yet */
149 talloc_steal(state, hdr);
150 talloc_steal(state, h);
152 /* now tell the caller than we will retry asynchronously */
157 a varient of ctdb_ltdb_lock_requeue that also fetches the record
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
160 TDB_DATA key, struct ctdb_ltdb_header *header,
161 struct ctdb_req_header *hdr, TDB_DATA *data,
162 void (*recv_pkt)(void *, struct ctdb_req_header *),
163 void *recv_context, bool ignore_generation)
167 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
168 recv_context, ignore_generation);
170 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
172 ctdb_ltdb_unlock(ctdb_db, key);
180 paraoid check to see if the db is empty
182 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
184 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
185 int count = tdb_traverse_read(tdb, NULL, NULL);
187 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
189 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
195 attach to a database, handling both persistent and non-persistent databases
196 return 0 on success, -1 on failure
198 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
199 bool persistent, const char *unhealthy_reason)
201 struct ctdb_db_context *ctdb_db, *tmp_db;
206 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
207 CTDB_NO_MEMORY(ctdb, ctdb_db);
209 ctdb_db->priority = 1;
210 ctdb_db->ctdb = ctdb;
211 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
212 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
214 key.dsize = strlen(db_name)+1;
215 key.dptr = discard_const(db_name);
216 ctdb_db->db_id = ctdb_hash(&key);
217 ctdb_db->persistent = persistent;
219 /* check for hash collisions */
220 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
221 if (tmp_db->db_id == ctdb_db->db_id) {
222 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
223 tmp_db->db_id, db_name, tmp_db->db_name));
224 talloc_free(ctdb_db);
229 /* open the database */
230 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
231 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
234 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
235 if (!ctdb->do_setsched) {
236 tdb_flags |= TDB_NOMMAP;
238 tdb_flags |= TDB_DISALLOW_NESTING;
240 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
241 ctdb->tunable.database_hash_size,
243 O_CREAT|O_RDWR, 0600);
244 if (ctdb_db->ltdb == NULL) {
245 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
246 talloc_free(ctdb_db);
251 ctdb_check_db_empty(ctdb_db);
254 DLIST_ADD(ctdb->db_list, ctdb_db);
256 /* setting this can help some high churn databases */
257 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
260 all databases support the "null" function. we need this in
261 order to do forced migration of records
263 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
265 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
266 talloc_free(ctdb_db);
271 all databases support the "fetch" function. we need this
272 for efficient Samba3 ctdb fetch
274 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
276 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
277 talloc_free(ctdb_db);
281 ret = ctdb_vacuum_init(ctdb_db);
283 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
284 "database '%s'\n", ctdb_db->db_name));
285 talloc_free(ctdb_db);
290 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
298 a client has asked to attach a new database
300 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
301 TDB_DATA *outdata, uint64_t tdb_flags,
304 const char *db_name = (const char *)indata.dptr;
305 struct ctdb_db_context *db;
306 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
308 /* the client can optionally pass additional tdb flags, but we
309 only allow a subset of those on the database in ctdb. Note
310 that tdb_flags is passed in via the (otherwise unused)
311 srvid to the attach control */
312 tdb_flags &= TDB_NOSYNC;
314 /* If the node is inactive it is not part of the cluster
315 and we should not allow clients to attach to any
318 if (node->flags & NODE_FLAGS_INACTIVE) {
319 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
324 /* see if we already have this name */
325 db = ctdb_db_handle(ctdb, db_name);
327 outdata->dptr = (uint8_t *)&db->db_id;
328 outdata->dsize = sizeof(db->db_id);
329 tdb_add_flags(db->ltdb->tdb, tdb_flags);
333 if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
337 db = ctdb_db_handle(ctdb, db_name);
339 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
343 /* remember the flags the client has specified */
344 tdb_add_flags(db->ltdb->tdb, tdb_flags);
346 outdata->dptr = (uint8_t *)&db->db_id;
347 outdata->dsize = sizeof(db->db_id);
349 /* tell all the other nodes about this database */
350 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
351 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
352 CTDB_CONTROL_DB_ATTACH,
353 0, CTDB_CTRL_FLAG_NOREPLY,
362 attach to all existing persistent databases
364 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
365 const char *unhealthy_reason)
370 /* open the persistent db directory and scan it for files */
371 d = opendir(ctdb->db_directory_persistent);
376 while ((de=readdir(d))) {
378 size_t len = strlen(de->d_name);
380 int invalid_name = 0;
382 s = talloc_strdup(ctdb, de->d_name);
383 CTDB_NO_MEMORY(ctdb, s);
385 /* only accept names ending in .tdb */
386 p = strstr(s, ".tdb.");
387 if (len < 7 || p == NULL) {
392 /* only accept names ending with .tdb. and any number of digits */
394 while (*q != 0 && invalid_name == 0) {
395 if (!isdigit(*q++)) {
399 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
400 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
406 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
407 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
413 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
421 int ctdb_attach_databases(struct ctdb_context *ctdb)
424 char *persistent_health_path = NULL;
425 char *unhealthy_reason = NULL;
426 bool first_try = true;
428 if (ctdb->db_directory == NULL) {
429 ctdb->db_directory = VARDIR "/ctdb";
431 if (ctdb->db_directory_persistent == NULL) {
432 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
434 if (ctdb->db_directory_state == NULL) {
435 ctdb->db_directory_state = VARDIR "/ctdb/state";
438 /* make sure the db directory exists */
439 ret = mkdir(ctdb->db_directory, 0700);
440 if (ret == -1 && errno != EEXIST) {
441 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
442 ctdb->db_directory));
446 /* make sure the persistent db directory exists */
447 ret = mkdir(ctdb->db_directory_persistent, 0700);
448 if (ret == -1 && errno != EEXIST) {
449 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
450 ctdb->db_directory_persistent));
454 /* make sure the internal state db directory exists */
455 ret = mkdir(ctdb->db_directory_state, 0700);
456 if (ret == -1 && errno != EEXIST) {
457 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
458 ctdb->db_directory_state));
462 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
463 ctdb->db_directory_state,
464 PERSISTENT_HEALTH_TDB,
466 if (persistent_health_path == NULL) {
467 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
473 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
474 0, TDB_DISALLOW_NESTING,
475 O_CREAT | O_RDWR, 0600);
476 if (ctdb->db_persistent_health == NULL) {
477 struct tdb_wrap *tdb;
480 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
481 persistent_health_path,
484 talloc_free(persistent_health_path);
485 talloc_free(unhealthy_reason);
490 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
491 persistent_health_path,
492 "was cleared after a failure",
493 "manual verification needed");
494 if (unhealthy_reason == NULL) {
495 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
496 talloc_free(persistent_health_path);
500 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
501 persistent_health_path));
502 tdb = tdb_wrap_open(ctdb, persistent_health_path,
503 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
504 O_CREAT | O_RDWR, 0600);
506 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
507 persistent_health_path,
510 talloc_free(persistent_health_path);
511 talloc_free(unhealthy_reason);
518 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
520 struct tdb_wrap *tdb;
522 talloc_free(ctdb->db_persistent_health);
523 ctdb->db_persistent_health = NULL;
526 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
527 persistent_health_path));
528 talloc_free(persistent_health_path);
529 talloc_free(unhealthy_reason);
534 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
535 persistent_health_path,
536 "was cleared after a failure",
537 "manual verification needed");
538 if (unhealthy_reason == NULL) {
539 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
540 talloc_free(persistent_health_path);
544 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
545 persistent_health_path));
546 tdb = tdb_wrap_open(ctdb, persistent_health_path,
547 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
548 O_CREAT | O_RDWR, 0600);
550 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
551 persistent_health_path,
554 talloc_free(persistent_health_path);
555 talloc_free(unhealthy_reason);
562 talloc_free(persistent_health_path);
564 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
565 talloc_free(unhealthy_reason);
574 called when a broadcast seqnum update comes in
576 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
578 struct ctdb_db_context *ctdb_db;
579 if (srcnode == ctdb->pnn) {
580 /* don't update ourselves! */
584 ctdb_db = find_ctdb_db(ctdb, db_id);
586 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
590 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
591 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
596 timer to check for seqnum changes in a ltdb and propogate them
598 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
599 struct timeval t, void *p)
601 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
602 struct ctdb_context *ctdb = ctdb_db->ctdb;
603 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
604 if (new_seqnum != ctdb_db->seqnum) {
605 /* something has changed - propogate it */
607 data.dptr = (uint8_t *)&ctdb_db->db_id;
608 data.dsize = sizeof(uint32_t);
609 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
610 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
613 ctdb_db->seqnum = new_seqnum;
615 /* setup a new timer */
616 ctdb_db->seqnum_update =
617 event_add_timed(ctdb->ev, ctdb_db,
618 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
619 ctdb_ltdb_seqnum_check, ctdb_db);
623 enable seqnum handling on this db
625 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
627 struct ctdb_db_context *ctdb_db;
628 ctdb_db = find_ctdb_db(ctdb, db_id);
630 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
634 if (ctdb_db->seqnum_update == NULL) {
635 ctdb_db->seqnum_update =
636 event_add_timed(ctdb->ev, ctdb_db,
637 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
638 ctdb_ltdb_seqnum_check, ctdb_db);
641 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
642 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
646 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
648 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
649 struct ctdb_db_context *ctdb_db;
651 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
653 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
657 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
658 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
662 ctdb_db->priority = db_prio->priority;
663 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));