4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
31 find an attached ctdb_db handle given a name
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
35 struct ctdb_db_context *tmp_db;
36 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37 if (strcmp(name, tmp_db->db_name) == 0) {
46 this is the dummy null procedure that all databases support
48 static int ctdb_null_func(struct ctdb_call_info *call)
54 this is a plain fetch procedure that all databases support
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
58 call->reply_data = &call->record_data;
64 return the lmaster given a key
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
68 uint32_t idx, lmaster;
70 idx = ctdb_hash(key) % ctdb->vnn_map->size;
71 lmaster = ctdb->vnn_map->map[idx];
78 construct an initial header for a record with no ltdb header yet
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
82 struct ctdb_ltdb_header *header)
85 /* initial dmaster is the lmaster */
86 header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87 header->laccessor = header->dmaster;
93 fetch a record from the ltdb, separating out the header information
94 and returning the body of the record. A valid (initial) header is
95 returned if the record is not present
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
98 TDB_DATA key, struct ctdb_ltdb_header *header,
99 TALLOC_CTX *mem_ctx, TDB_DATA *data)
102 struct ctdb_context *ctdb = ctdb_db->ctdb;
104 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105 if (rec.dsize < sizeof(*header)) {
107 /* return an initial header */
108 if (rec.dptr) free(rec.dptr);
109 ltdb_initial_header(ctdb_db, key, header);
114 ctdb_ltdb_store(ctdb_db, key, header, d2);
118 *header = *(struct ctdb_ltdb_header *)rec.dptr;
121 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
122 data->dptr = talloc_memdup(mem_ctx,
123 sizeof(struct ctdb_ltdb_header)+rec.dptr,
129 CTDB_NO_MEMORY(ctdb, data->dptr);
137 fetch a record from the ltdb, separating out the header information
138 and returning the body of the record. A valid (initial) header is
139 returned if the record is not present
141 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
142 struct ctdb_ltdb_header *header, TDB_DATA data)
144 struct ctdb_context *ctdb = ctdb_db->ctdb;
148 if (ctdb->flags & CTDB_FLAG_TORTURE) {
149 struct ctdb_ltdb_header *h2;
150 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
151 h2 = (struct ctdb_ltdb_header *)rec.dptr;
152 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
153 DEBUG(0,("RSN regression! %llu %llu\n",
154 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
156 if (rec.dptr) free(rec.dptr);
159 rec.dsize = sizeof(*header) + data.dsize;
160 rec.dptr = talloc_size(ctdb, rec.dsize);
161 CTDB_NO_MEMORY(ctdb, rec.dptr);
163 memcpy(rec.dptr, header, sizeof(*header));
164 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
166 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
167 talloc_free(rec.dptr);
174 lock a record in the ltdb, given a key
176 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
178 return tdb_chainlock(ctdb_db->ltdb->tdb, key);
182 unlock a record in the ltdb, given a key
184 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
186 int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
188 DEBUG(0,("tdb_chainunlock failed\n"));
193 struct lock_fetch_state {
194 struct ctdb_context *ctdb;
195 void (*recv_pkt)(void *, struct ctdb_req_header *);
197 struct ctdb_req_header *hdr;
199 bool ignore_generation;
203 called when we should retry the operation
205 static void lock_fetch_callback(void *p)
207 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
208 if (!state->ignore_generation &&
209 state->generation != state->ctdb->vnn_map->generation) {
210 DEBUG(0,("Discarding previous generation lockwait packet\n"));
211 talloc_free(state->hdr);
214 state->recv_pkt(state->recv_context, state->hdr);
215 DEBUG(2,(__location__ " PACKET REQUEUED\n"));
220 do a non-blocking ltdb_lock, deferring this ctdb request until we
223 It does the following:
225 1) tries to get the chainlock. If it succeeds, then it returns 0
227 2) if it fails to get a chainlock immediately then it sets up a
228 non-blocking chainlock via ctdb_lockwait, and when it gets the
229 chainlock it re-submits this ctdb request to the main packet
232 This effectively queues all ctdb requests that cannot be
233 immediately satisfied until it can get the lock. This means that
234 the main ctdb daemon will not block waiting for a chainlock held by
237 There are 3 possible return values:
239 0: means that it got the lock immediately.
240 -1: means that it failed to get the lock, and won't retry
241 -2: means that it failed to get the lock immediately, but will retry
243 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
244 TDB_DATA key, struct ctdb_req_header *hdr,
245 void (*recv_pkt)(void *, struct ctdb_req_header *),
246 void *recv_context, bool ignore_generation)
249 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
250 struct lockwait_handle *h;
251 struct lock_fetch_state *state;
253 ret = tdb_chainlock_nonblock(tdb, key);
256 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
257 /* a hard failure - don't try again */
261 /* when torturing, ensure we test the contended path */
262 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
265 tdb_chainunlock(tdb, key);
268 /* first the non-contended path */
273 state = talloc(hdr, struct lock_fetch_state);
274 state->ctdb = ctdb_db->ctdb;
276 state->recv_pkt = recv_pkt;
277 state->recv_context = recv_context;
278 state->generation = ctdb_db->ctdb->vnn_map->generation;
279 state->ignore_generation = ignore_generation;
281 /* now the contended path */
282 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
284 tdb_chainunlock(tdb, key);
288 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
289 so it won't be freed yet */
290 talloc_steal(state, hdr);
291 talloc_steal(state, h);
293 /* now tell the caller than we will retry asynchronously */
298 a varient of ctdb_ltdb_lock_requeue that also fetches the record
300 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
301 TDB_DATA key, struct ctdb_ltdb_header *header,
302 struct ctdb_req_header *hdr, TDB_DATA *data,
303 void (*recv_pkt)(void *, struct ctdb_req_header *),
304 void *recv_context, bool ignore_generation)
308 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
309 recv_context, ignore_generation);
311 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
313 ctdb_ltdb_unlock(ctdb_db, key);
321 paraoid check to see if the db is empty
323 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
325 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
326 int count = tdb_traverse_read(tdb, NULL, NULL);
328 DEBUG(0,(__location__ " tdb '%s' not empty on attach! aborting\n",
330 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
335 a client has asked to attach a new database
337 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
340 const char *db_name = (const char *)indata.dptr;
341 struct ctdb_db_context *ctdb_db, *tmp_db;
344 /* see if we already have this name */
345 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
346 if (strcmp(db_name, tmp_db->db_name) == 0) {
347 /* this is not an error */
348 outdata->dptr = (uint8_t *)&tmp_db->db_id;
349 outdata->dsize = sizeof(tmp_db->db_id);
354 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
355 CTDB_NO_MEMORY(ctdb, ctdb_db);
357 ctdb_db->ctdb = ctdb;
358 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
359 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
361 ctdb_db->db_id = ctdb_hash(&indata);
363 outdata->dptr = (uint8_t *)&ctdb_db->db_id;
364 outdata->dsize = sizeof(ctdb_db->db_id);
366 /* check for hash collisions */
367 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
368 if (tmp_db->db_id == ctdb_db->db_id) {
369 DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
370 tmp_db->db_id, db_name, tmp_db->db_name));
371 talloc_free(ctdb_db);
376 if (ctdb->db_directory == NULL) {
377 ctdb->db_directory = VARDIR "/ctdb";
380 /* make sure the db directory exists */
381 if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
382 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
383 ctdb->db_directory));
384 talloc_free(ctdb_db);
388 /* open the database */
389 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
393 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0,
394 TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
395 if (ctdb_db->ltdb == NULL) {
396 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
397 talloc_free(ctdb_db);
401 ctdb_check_db_empty(ctdb_db);
403 DLIST_ADD(ctdb->db_list, ctdb_db);
406 all databases support the "null" function. we need this in
407 order to do forced migration of records
409 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
411 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
412 talloc_free(ctdb_db);
417 all databases support the "fetch" function. we need this
418 for efficient Samba3 ctdb fetch
420 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
422 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
423 talloc_free(ctdb_db);
427 /* tell all the other nodes about this database */
428 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
429 CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
432 DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
439 called when a broadcast seqnum update comes in
441 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
443 struct ctdb_db_context *ctdb_db;
444 if (srcnode == ctdb->vnn) {
445 /* don't update ourselves! */
449 ctdb_db = find_ctdb_db(ctdb, db_id);
451 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
455 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
456 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
461 timer to check for seqnum changes in a ltdb and propogate them
463 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
464 struct timeval t, void *p)
466 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
467 struct ctdb_context *ctdb = ctdb_db->ctdb;
468 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
469 if (new_seqnum != ctdb_db->seqnum) {
470 /* something has changed - propogate it */
472 data.dptr = (uint8_t *)&ctdb_db->db_id;
473 data.dsize = sizeof(uint32_t);
474 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
475 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
478 ctdb_db->seqnum = new_seqnum;
480 /* setup a new timer */
481 ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db,
482 timeval_current_ofs(ctdb->seqnum_frequency, 0),
483 ctdb_ltdb_seqnum_check, ctdb_db);
487 enable seqnum handling on this db
489 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
491 struct ctdb_db_context *ctdb_db;
492 ctdb_db = find_ctdb_db(ctdb, db_id);
494 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
498 if (ctdb_db->te == NULL) {
499 ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db,
500 timeval_current_ofs(ctdb->seqnum_frequency, 0),
501 ctdb_ltdb_seqnum_check, ctdb_db);
504 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
505 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
510 enable seqnum handling on this db
512 int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency)
514 ctdb->seqnum_frequency = frequency;