2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
31 this is the dummy null procedure that all databases support
33 static int ctdb_null_func(struct ctdb_call_info *call)
39 this is a plain fetch procedure that all databases support
41 static int ctdb_fetch_func(struct ctdb_call_info *call)
43 call->reply_data = &call->record_data;
49 struct lock_fetch_state {
50 struct ctdb_context *ctdb;
51 void (*recv_pkt)(void *, struct ctdb_req_header *);
53 struct ctdb_req_header *hdr;
55 bool ignore_generation;
59 called when we should retry the operation
61 static void lock_fetch_callback(void *p)
63 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
64 if (!state->ignore_generation &&
65 state->generation != state->ctdb->vnn_map->generation) {
66 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
67 talloc_free(state->hdr);
70 state->recv_pkt(state->recv_context, state->hdr);
71 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
76 do a non-blocking ltdb_lock, deferring this ctdb request until we
79 It does the following:
81 1) tries to get the chainlock. If it succeeds, then it returns 0
83 2) if it fails to get a chainlock immediately then it sets up a
84 non-blocking chainlock via ctdb_lockwait, and when it gets the
85 chainlock it re-submits this ctdb request to the main packet
88 This effectively queues all ctdb requests that cannot be
89 immediately satisfied until it can get the lock. This means that
90 the main ctdb daemon will not block waiting for a chainlock held by
93 There are 3 possible return values:
95 0: means that it got the lock immediately.
96 -1: means that it failed to get the lock, and won't retry
97 -2: means that it failed to get the lock immediately, but will retry
99 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
100 TDB_DATA key, struct ctdb_req_header *hdr,
101 void (*recv_pkt)(void *, struct ctdb_req_header *),
102 void *recv_context, bool ignore_generation)
105 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
106 struct lockwait_handle *h;
107 struct lock_fetch_state *state;
109 ret = tdb_chainlock_nonblock(tdb, key);
112 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
113 /* a hard failure - don't try again */
117 /* when torturing, ensure we test the contended path */
118 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
121 tdb_chainunlock(tdb, key);
124 /* first the non-contended path */
129 state = talloc(hdr, struct lock_fetch_state);
130 state->ctdb = ctdb_db->ctdb;
132 state->recv_pkt = recv_pkt;
133 state->recv_context = recv_context;
134 state->generation = ctdb_db->ctdb->vnn_map->generation;
135 state->ignore_generation = ignore_generation;
137 /* now the contended path */
138 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
140 tdb_chainunlock(tdb, key);
144 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
145 so it won't be freed yet */
146 talloc_steal(state, hdr);
147 talloc_steal(state, h);
149 /* now tell the caller than we will retry asynchronously */
154 a varient of ctdb_ltdb_lock_requeue that also fetches the record
156 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
157 TDB_DATA key, struct ctdb_ltdb_header *header,
158 struct ctdb_req_header *hdr, TDB_DATA *data,
159 void (*recv_pkt)(void *, struct ctdb_req_header *),
160 void *recv_context, bool ignore_generation)
164 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
165 recv_context, ignore_generation);
167 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
169 ctdb_ltdb_unlock(ctdb_db, key);
177 paraoid check to see if the db is empty
179 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
181 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
182 int count = tdb_traverse_read(tdb, NULL, NULL);
184 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
186 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
192 attach to a database, handling both persistent and non-persistent databases
193 return 0 on success, -1 on failure
195 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
197 struct ctdb_db_context *ctdb_db, *tmp_db;
202 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
203 CTDB_NO_MEMORY(ctdb, ctdb_db);
205 ctdb_db->priority = 1;
206 ctdb_db->ctdb = ctdb;
207 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
208 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
210 key.dsize = strlen(db_name)+1;
211 key.dptr = discard_const(db_name);
212 ctdb_db->db_id = ctdb_hash(&key);
213 ctdb_db->persistent = persistent;
215 /* check for hash collisions */
216 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
217 if (tmp_db->db_id == ctdb_db->db_id) {
218 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
219 tmp_db->db_id, db_name, tmp_db->db_name));
220 talloc_free(ctdb_db);
225 if (ctdb->db_directory == NULL) {
226 ctdb->db_directory = VARDIR "/ctdb";
229 /* make sure the db directory exists */
230 if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
231 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
232 ctdb->db_directory));
233 talloc_free(ctdb_db);
237 if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
238 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
239 ctdb->db_directory_persistent));
240 talloc_free(ctdb_db);
244 /* open the database */
245 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
246 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
249 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
250 if (!ctdb->do_setsched) {
251 tdb_flags |= TDB_NOMMAP;
254 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
255 ctdb->tunable.database_hash_size,
257 O_CREAT|O_RDWR, 0666);
258 if (ctdb_db->ltdb == NULL) {
259 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s'\n", ctdb_db->db_path));
260 talloc_free(ctdb_db);
265 ctdb_check_db_empty(ctdb_db);
268 DLIST_ADD(ctdb->db_list, ctdb_db);
270 /* setting this can help some high churn databases */
271 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
274 all databases support the "null" function. we need this in
275 order to do forced migration of records
277 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
279 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
280 talloc_free(ctdb_db);
285 all databases support the "fetch" function. we need this
286 for efficient Samba3 ctdb fetch
288 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
290 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
291 talloc_free(ctdb_db);
295 ret = ctdb_vacuum_init(ctdb_db);
297 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for database '%s'\n", ctdb_db->db_name));
298 talloc_free(ctdb_db);
303 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
311 a client has asked to attach a new database
313 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
314 TDB_DATA *outdata, uint64_t tdb_flags,
317 const char *db_name = (const char *)indata.dptr;
318 struct ctdb_db_context *db;
319 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
321 /* the client can optionally pass additional tdb flags, but we
322 only allow a subset of those on the database in ctdb. Note
323 that tdb_flags is passed in via the (otherwise unused)
324 srvid to the attach control */
325 tdb_flags &= TDB_NOSYNC;
327 /* If the node is inactive it is not part of the cluster
328 and we should not allow clients to attach to any
331 if (node->flags & NODE_FLAGS_INACTIVE) {
332 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
337 /* see if we already have this name */
338 db = ctdb_db_handle(ctdb, db_name);
340 outdata->dptr = (uint8_t *)&db->db_id;
341 outdata->dsize = sizeof(db->db_id);
342 tdb_add_flags(db->ltdb->tdb, tdb_flags);
346 if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
350 db = ctdb_db_handle(ctdb, db_name);
352 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
356 /* remember the flags the client has specified */
357 tdb_add_flags(db->ltdb->tdb, tdb_flags);
359 outdata->dptr = (uint8_t *)&db->db_id;
360 outdata->dsize = sizeof(db->db_id);
362 /* tell all the other nodes about this database */
363 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
364 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
365 CTDB_CONTROL_DB_ATTACH,
366 0, CTDB_CTRL_FLAG_NOREPLY,
375 attach to all existing persistent databases
377 int ctdb_attach_persistent(struct ctdb_context *ctdb)
382 /* open the persistent db directory and scan it for files */
383 d = opendir(ctdb->db_directory_persistent);
388 while ((de=readdir(d))) {
390 size_t len = strlen(de->d_name);
393 s = talloc_strdup(ctdb, de->d_name);
394 CTDB_NO_MEMORY(ctdb, s);
396 /* ignore names ending in .bak */
397 p = strstr(s, ".bak");
402 /* only accept names ending in .tdb */
403 p = strstr(s, ".tdb.");
404 if (len < 7 || p == NULL) {
408 if (sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
414 if (ctdb_local_attach(ctdb, s, true) != 0) {
415 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
420 DEBUG(DEBUG_NOTICE,("Attached to persistent database %s\n", s));
429 called when a broadcast seqnum update comes in
431 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
433 struct ctdb_db_context *ctdb_db;
434 if (srcnode == ctdb->pnn) {
435 /* don't update ourselves! */
439 ctdb_db = find_ctdb_db(ctdb, db_id);
441 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
445 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
446 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
451 timer to check for seqnum changes in a ltdb and propogate them
453 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
454 struct timeval t, void *p)
456 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
457 struct ctdb_context *ctdb = ctdb_db->ctdb;
458 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
459 if (new_seqnum != ctdb_db->seqnum) {
460 /* something has changed - propogate it */
462 data.dptr = (uint8_t *)&ctdb_db->db_id;
463 data.dsize = sizeof(uint32_t);
464 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
465 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
468 ctdb_db->seqnum = new_seqnum;
470 /* setup a new timer */
472 event_add_timed(ctdb->ev, ctdb_db,
473 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
474 ctdb_ltdb_seqnum_check, ctdb_db);
478 enable seqnum handling on this db
480 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
482 struct ctdb_db_context *ctdb_db;
483 ctdb_db = find_ctdb_db(ctdb, db_id);
485 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
489 if (ctdb_db->te == NULL) {
491 event_add_timed(ctdb->ev, ctdb_db,
492 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
493 ctdb_ltdb_seqnum_check, ctdb_db);
496 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
497 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
501 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
503 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
504 struct ctdb_db_context *ctdb_db;
506 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
508 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
512 ctdb_db->priority = db_prio->priority;
513 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));