2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
32 this is the dummy null procedure that all databases support
34 static int ctdb_null_func(struct ctdb_call_info *call)
40 this is a plain fetch procedure that all databases support
42 static int ctdb_fetch_func(struct ctdb_call_info *call)
44 call->reply_data = &call->record_data;
50 struct lock_fetch_state {
51 struct ctdb_context *ctdb;
52 void (*recv_pkt)(void *, struct ctdb_req_header *);
54 struct ctdb_req_header *hdr;
56 bool ignore_generation;
60 called when we should retry the operation
62 static void lock_fetch_callback(void *p)
64 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
65 if (!state->ignore_generation &&
66 state->generation != state->ctdb->vnn_map->generation) {
67 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
68 talloc_free(state->hdr);
71 state->recv_pkt(state->recv_context, state->hdr);
72 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
77 do a non-blocking ltdb_lock, deferring this ctdb request until we
80 It does the following:
82 1) tries to get the chainlock. If it succeeds, then it returns 0
84 2) if it fails to get a chainlock immediately then it sets up a
85 non-blocking chainlock via ctdb_lockwait, and when it gets the
86 chainlock it re-submits this ctdb request to the main packet
89 This effectively queues all ctdb requests that cannot be
90 immediately satisfied until it can get the lock. This means that
91 the main ctdb daemon will not block waiting for a chainlock held by
94 There are 3 possible return values:
96 0: means that it got the lock immediately.
97 -1: means that it failed to get the lock, and won't retry
98 -2: means that it failed to get the lock immediately, but will retry
100 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
101 TDB_DATA key, struct ctdb_req_header *hdr,
102 void (*recv_pkt)(void *, struct ctdb_req_header *),
103 void *recv_context, bool ignore_generation)
106 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
107 struct lockwait_handle *h;
108 struct lock_fetch_state *state;
110 ret = tdb_chainlock_nonblock(tdb, key);
113 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
114 /* a hard failure - don't try again */
118 /* when torturing, ensure we test the contended path */
119 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
122 tdb_chainunlock(tdb, key);
125 /* first the non-contended path */
130 state = talloc(hdr, struct lock_fetch_state);
131 state->ctdb = ctdb_db->ctdb;
133 state->recv_pkt = recv_pkt;
134 state->recv_context = recv_context;
135 state->generation = ctdb_db->ctdb->vnn_map->generation;
136 state->ignore_generation = ignore_generation;
138 /* now the contended path */
139 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
141 tdb_chainunlock(tdb, key);
145 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
146 so it won't be freed yet */
147 talloc_steal(state, hdr);
148 talloc_steal(state, h);
150 /* now tell the caller than we will retry asynchronously */
155 a varient of ctdb_ltdb_lock_requeue that also fetches the record
157 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
158 TDB_DATA key, struct ctdb_ltdb_header *header,
159 struct ctdb_req_header *hdr, TDB_DATA *data,
160 void (*recv_pkt)(void *, struct ctdb_req_header *),
161 void *recv_context, bool ignore_generation)
165 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
166 recv_context, ignore_generation);
168 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
170 ctdb_ltdb_unlock(ctdb_db, key);
178 paraoid check to see if the db is empty
180 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
182 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
183 int count = tdb_traverse_read(tdb, NULL, NULL);
185 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
187 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
193 attach to a database, handling both persistent and non-persistent databases
194 return 0 on success, -1 on failure
196 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
198 struct ctdb_db_context *ctdb_db, *tmp_db;
203 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
204 CTDB_NO_MEMORY(ctdb, ctdb_db);
206 ctdb_db->priority = 1;
207 ctdb_db->ctdb = ctdb;
208 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
209 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
211 key.dsize = strlen(db_name)+1;
212 key.dptr = discard_const(db_name);
213 ctdb_db->db_id = ctdb_hash(&key);
214 ctdb_db->persistent = persistent;
216 /* check for hash collisions */
217 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
218 if (tmp_db->db_id == ctdb_db->db_id) {
219 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
220 tmp_db->db_id, db_name, tmp_db->db_name));
221 talloc_free(ctdb_db);
226 if (ctdb->db_directory == NULL) {
227 ctdb->db_directory = VARDIR "/ctdb";
230 /* make sure the db directory exists */
231 if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
232 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
233 ctdb->db_directory));
234 talloc_free(ctdb_db);
238 if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
239 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
240 ctdb->db_directory_persistent));
241 talloc_free(ctdb_db);
245 /* open the database */
246 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
247 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
250 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
251 if (!ctdb->do_setsched) {
252 tdb_flags |= TDB_NOMMAP;
254 tdb_flags |= TDB_DISALLOW_NESTING;
256 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
257 ctdb->tunable.database_hash_size,
259 O_CREAT|O_RDWR, 0666);
260 if (ctdb_db->ltdb == NULL) {
261 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
262 talloc_free(ctdb_db);
267 ctdb_check_db_empty(ctdb_db);
270 DLIST_ADD(ctdb->db_list, ctdb_db);
272 /* setting this can help some high churn databases */
273 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
276 all databases support the "null" function. we need this in
277 order to do forced migration of records
279 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
281 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
282 talloc_free(ctdb_db);
287 all databases support the "fetch" function. we need this
288 for efficient Samba3 ctdb fetch
290 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
292 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
293 talloc_free(ctdb_db);
297 ret = ctdb_vacuum_init(ctdb_db);
299 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
300 "database '%s'\n", ctdb_db->db_name));
301 talloc_free(ctdb_db);
306 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
314 a client has asked to attach a new database
316 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
317 TDB_DATA *outdata, uint64_t tdb_flags,
320 const char *db_name = (const char *)indata.dptr;
321 struct ctdb_db_context *db;
322 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
324 /* the client can optionally pass additional tdb flags, but we
325 only allow a subset of those on the database in ctdb. Note
326 that tdb_flags is passed in via the (otherwise unused)
327 srvid to the attach control */
328 tdb_flags &= TDB_NOSYNC;
330 /* If the node is inactive it is not part of the cluster
331 and we should not allow clients to attach to any
334 if (node->flags & NODE_FLAGS_INACTIVE) {
335 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
340 /* see if we already have this name */
341 db = ctdb_db_handle(ctdb, db_name);
343 outdata->dptr = (uint8_t *)&db->db_id;
344 outdata->dsize = sizeof(db->db_id);
345 tdb_add_flags(db->ltdb->tdb, tdb_flags);
349 if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
353 db = ctdb_db_handle(ctdb, db_name);
355 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
359 /* remember the flags the client has specified */
360 tdb_add_flags(db->ltdb->tdb, tdb_flags);
362 outdata->dptr = (uint8_t *)&db->db_id;
363 outdata->dsize = sizeof(db->db_id);
365 /* tell all the other nodes about this database */
366 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
367 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
368 CTDB_CONTROL_DB_ATTACH,
369 0, CTDB_CTRL_FLAG_NOREPLY,
378 attach to all existing persistent databases
380 int ctdb_attach_persistent(struct ctdb_context *ctdb)
385 /* open the persistent db directory and scan it for files */
386 d = opendir(ctdb->db_directory_persistent);
391 while ((de=readdir(d))) {
393 size_t len = strlen(de->d_name);
395 int invalid_name = 0;
397 s = talloc_strdup(ctdb, de->d_name);
398 CTDB_NO_MEMORY(ctdb, s);
400 /* only accept names ending in .tdb */
401 p = strstr(s, ".tdb.");
402 if (len < 7 || p == NULL) {
407 /* only accept names ending with .tdb. and any number of digits */
409 while (*q != 0 && invalid_name == 0) {
410 if (!isdigit(*q++)) {
414 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
415 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
421 if (ctdb_local_attach(ctdb, s, true) != 0) {
422 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
427 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
436 called when a broadcast seqnum update comes in
438 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
440 struct ctdb_db_context *ctdb_db;
441 if (srcnode == ctdb->pnn) {
442 /* don't update ourselves! */
446 ctdb_db = find_ctdb_db(ctdb, db_id);
448 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
452 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
453 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
458 timer to check for seqnum changes in a ltdb and propogate them
460 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
461 struct timeval t, void *p)
463 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
464 struct ctdb_context *ctdb = ctdb_db->ctdb;
465 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
466 if (new_seqnum != ctdb_db->seqnum) {
467 /* something has changed - propogate it */
469 data.dptr = (uint8_t *)&ctdb_db->db_id;
470 data.dsize = sizeof(uint32_t);
471 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
472 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
475 ctdb_db->seqnum = new_seqnum;
477 /* setup a new timer */
478 ctdb_db->seqnum_update =
479 event_add_timed(ctdb->ev, ctdb_db,
480 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
481 ctdb_ltdb_seqnum_check, ctdb_db);
485 enable seqnum handling on this db
487 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
489 struct ctdb_db_context *ctdb_db;
490 ctdb_db = find_ctdb_db(ctdb, db_id);
492 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
496 if (ctdb_db->seqnum_update == NULL) {
497 ctdb_db->seqnum_update =
498 event_add_timed(ctdb->ev, ctdb_db,
499 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
500 ctdb_ltdb_seqnum_check, ctdb_db);
503 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
504 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
508 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
510 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
511 struct ctdb_db_context *ctdb_db;
513 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
515 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
519 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
520 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
524 ctdb_db->priority = db_prio->priority;
525 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));