4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
31 find an attached ctdb_db handle given a name
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
35 struct ctdb_db_context *tmp_db;
36 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37 if (strcmp(name, tmp_db->db_name) == 0) {
46 this is the dummy null procedure that all databases support
48 static int ctdb_null_func(struct ctdb_call_info *call)
54 this is a plain fetch procedure that all databases support
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
58 call->reply_data = &call->record_data;
64 return the lmaster given a key
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
68 uint32_t idx, lmaster;
70 idx = ctdb_hash(key) % ctdb->vnn_map->size;
71 lmaster = ctdb->vnn_map->map[idx];
78 construct an initial header for a record with no ltdb header yet
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
82 struct ctdb_ltdb_header *header)
85 /* initial dmaster is the lmaster */
86 header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87 header->laccessor = header->dmaster;
93 fetch a record from the ltdb, separating out the header information
94 and returning the body of the record. A valid (initial) header is
95 returned if the record is not present
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
98 TDB_DATA key, struct ctdb_ltdb_header *header,
99 TALLOC_CTX *mem_ctx, TDB_DATA *data)
102 struct ctdb_context *ctdb = ctdb_db->ctdb;
104 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105 if (rec.dsize < sizeof(*header)) {
107 /* return an initial header */
108 if (rec.dptr) free(rec.dptr);
109 if (ctdb->vnn_map == NULL) {
110 /* called from the client */
112 header->dmaster = (uint32_t)-1;
115 ltdb_initial_header(ctdb_db, key, header);
120 ctdb_ltdb_store(ctdb_db, key, header, d2);
124 *header = *(struct ctdb_ltdb_header *)rec.dptr;
127 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
128 data->dptr = talloc_memdup(mem_ctx,
129 sizeof(struct ctdb_ltdb_header)+rec.dptr,
135 CTDB_NO_MEMORY(ctdb, data->dptr);
143 fetch a record from the ltdb, separating out the header information
144 and returning the body of the record. A valid (initial) header is
145 returned if the record is not present
147 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
148 struct ctdb_ltdb_header *header, TDB_DATA data)
150 struct ctdb_context *ctdb = ctdb_db->ctdb;
154 if (ctdb->flags & CTDB_FLAG_TORTURE) {
155 struct ctdb_ltdb_header *h2;
156 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
157 h2 = (struct ctdb_ltdb_header *)rec.dptr;
158 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
159 DEBUG(0,("RSN regression! %llu %llu\n",
160 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
162 if (rec.dptr) free(rec.dptr);
165 rec.dsize = sizeof(*header) + data.dsize;
166 rec.dptr = talloc_size(ctdb, rec.dsize);
167 CTDB_NO_MEMORY(ctdb, rec.dptr);
169 memcpy(rec.dptr, header, sizeof(*header));
170 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
172 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
173 talloc_free(rec.dptr);
180 lock a record in the ltdb, given a key
182 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
184 return tdb_chainlock(ctdb_db->ltdb->tdb, key);
188 unlock a record in the ltdb, given a key
190 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
192 int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
194 DEBUG(0,("tdb_chainunlock failed\n"));
199 struct lock_fetch_state {
200 struct ctdb_context *ctdb;
201 void (*recv_pkt)(void *, struct ctdb_req_header *);
203 struct ctdb_req_header *hdr;
205 bool ignore_generation;
209 called when we should retry the operation
211 static void lock_fetch_callback(void *p)
213 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
214 if (!state->ignore_generation &&
215 state->generation != state->ctdb->vnn_map->generation) {
216 DEBUG(0,("Discarding previous generation lockwait packet\n"));
217 talloc_free(state->hdr);
220 state->recv_pkt(state->recv_context, state->hdr);
221 DEBUG(2,(__location__ " PACKET REQUEUED\n"));
226 do a non-blocking ltdb_lock, deferring this ctdb request until we
229 It does the following:
231 1) tries to get the chainlock. If it succeeds, then it returns 0
233 2) if it fails to get a chainlock immediately then it sets up a
234 non-blocking chainlock via ctdb_lockwait, and when it gets the
235 chainlock it re-submits this ctdb request to the main packet
238 This effectively queues all ctdb requests that cannot be
239 immediately satisfied until it can get the lock. This means that
240 the main ctdb daemon will not block waiting for a chainlock held by
243 There are 3 possible return values:
245 0: means that it got the lock immediately.
246 -1: means that it failed to get the lock, and won't retry
247 -2: means that it failed to get the lock immediately, but will retry
249 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
250 TDB_DATA key, struct ctdb_req_header *hdr,
251 void (*recv_pkt)(void *, struct ctdb_req_header *),
252 void *recv_context, bool ignore_generation)
255 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
256 struct lockwait_handle *h;
257 struct lock_fetch_state *state;
259 ret = tdb_chainlock_nonblock(tdb, key);
262 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
263 /* a hard failure - don't try again */
267 /* when torturing, ensure we test the contended path */
268 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
271 tdb_chainunlock(tdb, key);
274 /* first the non-contended path */
279 state = talloc(hdr, struct lock_fetch_state);
280 state->ctdb = ctdb_db->ctdb;
282 state->recv_pkt = recv_pkt;
283 state->recv_context = recv_context;
284 state->generation = ctdb_db->ctdb->vnn_map->generation;
285 state->ignore_generation = ignore_generation;
287 /* now the contended path */
288 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
290 tdb_chainunlock(tdb, key);
294 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
295 so it won't be freed yet */
296 talloc_steal(state, hdr);
297 talloc_steal(state, h);
299 /* now tell the caller than we will retry asynchronously */
304 a varient of ctdb_ltdb_lock_requeue that also fetches the record
306 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
307 TDB_DATA key, struct ctdb_ltdb_header *header,
308 struct ctdb_req_header *hdr, TDB_DATA *data,
309 void (*recv_pkt)(void *, struct ctdb_req_header *),
310 void *recv_context, bool ignore_generation)
314 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
315 recv_context, ignore_generation);
317 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
319 ctdb_ltdb_unlock(ctdb_db, key);
327 paraoid check to see if the db is empty
329 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
331 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
332 int count = tdb_traverse_read(tdb, NULL, NULL);
334 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
336 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
341 a client has asked to attach a new database
343 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
346 const char *db_name = (const char *)indata.dptr;
347 struct ctdb_db_context *ctdb_db, *tmp_db;
350 /* see if we already have this name */
351 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
352 if (strcmp(db_name, tmp_db->db_name) == 0) {
353 /* this is not an error */
354 outdata->dptr = (uint8_t *)&tmp_db->db_id;
355 outdata->dsize = sizeof(tmp_db->db_id);
360 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
361 CTDB_NO_MEMORY(ctdb, ctdb_db);
363 ctdb_db->ctdb = ctdb;
364 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
365 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
367 ctdb_db->db_id = ctdb_hash(&indata);
369 outdata->dptr = (uint8_t *)&ctdb_db->db_id;
370 outdata->dsize = sizeof(ctdb_db->db_id);
372 /* check for hash collisions */
373 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
374 if (tmp_db->db_id == ctdb_db->db_id) {
375 DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
376 tmp_db->db_id, db_name, tmp_db->db_name));
377 talloc_free(ctdb_db);
382 if (ctdb->db_directory == NULL) {
383 ctdb->db_directory = VARDIR "/ctdb";
386 /* make sure the db directory exists */
387 if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
388 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
389 ctdb->db_directory));
390 talloc_free(ctdb_db);
394 /* open the database */
395 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
399 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0,
400 TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
401 if (ctdb_db->ltdb == NULL) {
402 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
403 talloc_free(ctdb_db);
407 ctdb_check_db_empty(ctdb_db);
409 DLIST_ADD(ctdb->db_list, ctdb_db);
412 all databases support the "null" function. we need this in
413 order to do forced migration of records
415 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
417 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
418 talloc_free(ctdb_db);
423 all databases support the "fetch" function. we need this
424 for efficient Samba3 ctdb fetch
426 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
428 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
429 talloc_free(ctdb_db);
433 /* tell all the other nodes about this database */
434 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
435 CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
438 DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
445 called when a broadcast seqnum update comes in
447 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
449 struct ctdb_db_context *ctdb_db;
450 if (srcnode == ctdb->vnn) {
451 /* don't update ourselves! */
455 ctdb_db = find_ctdb_db(ctdb, db_id);
457 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
461 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
462 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
467 timer to check for seqnum changes in a ltdb and propogate them
469 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
470 struct timeval t, void *p)
472 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
473 struct ctdb_context *ctdb = ctdb_db->ctdb;
474 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
475 if (new_seqnum != ctdb_db->seqnum) {
476 /* something has changed - propogate it */
478 data.dptr = (uint8_t *)&ctdb_db->db_id;
479 data.dsize = sizeof(uint32_t);
480 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
481 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
484 ctdb_db->seqnum = new_seqnum;
486 /* setup a new timer */
488 event_add_timed(ctdb->ev, ctdb_db,
489 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
490 ctdb_ltdb_seqnum_check, ctdb_db);
494 enable seqnum handling on this db
496 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
498 struct ctdb_db_context *ctdb_db;
499 ctdb_db = find_ctdb_db(ctdb, db_id);
501 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
505 if (ctdb_db->te == NULL) {
507 event_add_timed(ctdb->ev, ctdb_db,
508 timeval_current_ofs(ctdb->tunable.seqnum_frequency, 0),
509 ctdb_ltdb_seqnum_check, ctdb_db);
512 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
513 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);