4 Copyright (C) Andrew Tridgell 2006
6 This library is free software; you can redistribute it and/or
7 modify it under the terms of the GNU Lesser General Public
8 License as published by the Free Software Foundation; either
9 version 2 of the License, or (at your option) any later version.
11 This library is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public
17 License along with this library; if not, write to the Free Software
18 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
31 find an attached ctdb_db handle given a name
33 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
35 struct ctdb_db_context *tmp_db;
36 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
37 if (strcmp(name, tmp_db->db_name) == 0) {
46 this is the dummy null procedure that all databases support
48 static int ctdb_null_func(struct ctdb_call_info *call)
54 this is a plain fetch procedure that all databases support
56 static int ctdb_fetch_func(struct ctdb_call_info *call)
58 call->reply_data = &call->record_data;
64 return the lmaster given a key
66 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
68 uint32_t idx, lmaster;
70 idx = ctdb_hash(key) % ctdb->vnn_map->size;
71 lmaster = ctdb->vnn_map->map[idx];
78 construct an initial header for a record with no ltdb header yet
80 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
82 struct ctdb_ltdb_header *header)
85 /* initial dmaster is the lmaster */
86 header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
87 header->laccessor = header->dmaster;
93 fetch a record from the ltdb, separating out the header information
94 and returning the body of the record. A valid (initial) header is
95 returned if the record is not present
97 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
98 TDB_DATA key, struct ctdb_ltdb_header *header,
99 TALLOC_CTX *mem_ctx, TDB_DATA *data)
102 struct ctdb_context *ctdb = ctdb_db->ctdb;
104 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
105 if (rec.dsize < sizeof(*header)) {
107 /* return an initial header */
108 if (rec.dptr) free(rec.dptr);
109 ltdb_initial_header(ctdb_db, key, header);
114 ctdb_ltdb_store(ctdb_db, key, header, d2);
118 *header = *(struct ctdb_ltdb_header *)rec.dptr;
121 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
122 data->dptr = talloc_memdup(mem_ctx,
123 sizeof(struct ctdb_ltdb_header)+rec.dptr,
129 CTDB_NO_MEMORY(ctdb, data->dptr);
137 fetch a record from the ltdb, separating out the header information
138 and returning the body of the record. A valid (initial) header is
139 returned if the record is not present
141 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
142 struct ctdb_ltdb_header *header, TDB_DATA data)
144 struct ctdb_context *ctdb = ctdb_db->ctdb;
148 rec.dsize = sizeof(*header) + data.dsize;
149 rec.dptr = talloc_size(ctdb, rec.dsize);
150 CTDB_NO_MEMORY(ctdb, rec.dptr);
152 memcpy(rec.dptr, header, sizeof(*header));
153 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
155 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
156 talloc_free(rec.dptr);
163 lock a record in the ltdb, given a key
165 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
167 return tdb_chainlock(ctdb_db->ltdb->tdb, key);
171 unlock a record in the ltdb, given a key
173 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
175 int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
177 DEBUG(0,("tdb_chainunlock failed\n"));
182 struct lock_fetch_state {
183 struct ctdb_context *ctdb;
184 void (*recv_pkt)(void *, uint8_t *, uint32_t);
186 struct ctdb_req_header *hdr;
190 called when we should retry the operation
192 static void lock_fetch_callback(void *p)
194 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
195 state->recv_pkt(state->recv_context, (uint8_t *)state->hdr, state->hdr->length);
196 DEBUG(2,(__location__ " PACKET REQUEUED\n"));
201 do a non-blocking ltdb_lock, deferring this ctdb request until we
204 It does the following:
206 1) tries to get the chainlock. If it succeeds, then it returns 0
208 2) if it fails to get a chainlock immediately then it sets up a
209 non-blocking chainlock via ctdb_lockwait, and when it gets the
210 chainlock it re-submits this ctdb request to the main packet
213 This effectively queues all ctdb requests that cannot be
214 immediately satisfied until it can get the lock. This means that
215 the main ctdb daemon will not block waiting for a chainlock held by
218 There are 3 possible return values:
220 0: means that it got the lock immediately.
221 -1: means that it failed to get the lock, and won't retry
222 -2: means that it failed to get the lock immediately, but will retry
224 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
225 TDB_DATA key, struct ctdb_req_header *hdr,
226 void (*recv_pkt)(void *, uint8_t *, uint32_t ),
230 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
231 struct lockwait_handle *h;
232 struct lock_fetch_state *state;
234 ret = tdb_chainlock_nonblock(tdb, key);
237 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
238 /* a hard failure - don't try again */
242 /* when torturing, ensure we test the contended path */
243 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
246 tdb_chainunlock(tdb, key);
249 /* first the non-contended path */
254 state = talloc(hdr, struct lock_fetch_state);
255 state->ctdb = ctdb_db->ctdb;
257 state->recv_pkt = recv_pkt;
258 state->recv_context = recv_context;
260 /* now the contended path */
261 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
263 tdb_chainunlock(tdb, key);
267 /* we need to move the packet off the temporary context in ctdb_recv_pkt(),
268 so it won't be freed yet */
269 talloc_steal(state, hdr);
270 talloc_steal(state, h);
272 /* now tell the caller than we will retry asynchronously */
277 a varient of ctdb_ltdb_lock_requeue that also fetches the record
279 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
280 TDB_DATA key, struct ctdb_ltdb_header *header,
281 struct ctdb_req_header *hdr, TDB_DATA *data,
282 void (*recv_pkt)(void *, uint8_t *, uint32_t ),
287 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
289 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
291 ctdb_ltdb_unlock(ctdb_db, key);
299 a client has asked to attach a new database
301 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
304 const char *db_name = (const char *)indata.dptr;
305 struct ctdb_db_context *ctdb_db, *tmp_db;
308 /* see if we already have this name */
309 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
310 if (strcmp(db_name, tmp_db->db_name) == 0) {
311 /* this is not an error */
312 outdata->dptr = (uint8_t *)&tmp_db->db_id;
313 outdata->dsize = sizeof(tmp_db->db_id);
318 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
319 CTDB_NO_MEMORY(ctdb, ctdb_db);
321 ctdb_db->ctdb = ctdb;
322 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
323 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
325 ctdb_db->db_id = ctdb_hash(&indata);
327 outdata->dptr = (uint8_t *)&ctdb_db->db_id;
328 outdata->dsize = sizeof(ctdb_db->db_id);
330 /* check for hash collisions */
331 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
332 if (tmp_db->db_id == ctdb_db->db_id) {
333 DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
334 db_name, tmp_db->db_name));
335 talloc_free(ctdb_db);
340 if (ctdb->db_directory == NULL) {
341 ctdb->db_directory = VARDIR "/ctdb";
344 /* make sure the db directory exists */
345 if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
346 DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n",
347 ctdb->db_directory));
348 talloc_free(ctdb_db);
352 /* open the database */
353 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
357 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0,
358 TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
359 if (ctdb_db->ltdb == NULL) {
360 DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
361 talloc_free(ctdb_db);
365 DLIST_ADD(ctdb->db_list, ctdb_db);
368 all databases support the "null" function. we need this in
369 order to do forced migration of records
371 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
373 DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
374 talloc_free(ctdb_db);
379 all databases support the "fetch" function. we need this
380 for efficient Samba3 ctdb fetch
382 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
384 DEBUG(0,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
385 talloc_free(ctdb_db);
389 /* tell all the other nodes about this database */
390 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
391 CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
394 DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
401 called when a broadcast seqnum update comes in
403 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
405 struct ctdb_db_context *ctdb_db;
406 if (srcnode == ctdb->vnn) {
407 /* don't update ourselves! */
411 ctdb_db = find_ctdb_db(ctdb, db_id);
413 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n"));
417 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
418 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
423 timer to check for seqnum changes in a ltdb and propogate them
425 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
426 struct timeval t, void *p)
428 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
429 struct ctdb_context *ctdb = ctdb_db->ctdb;
430 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
431 if (new_seqnum != ctdb_db->seqnum) {
432 /* something has changed - propogate it */
434 data.dptr = (uint8_t *)&ctdb_db->db_id;
435 data.dsize = sizeof(uint32_t);
436 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
437 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
440 ctdb_db->seqnum = new_seqnum;
442 /* setup a new timer */
443 ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db,
444 timeval_current_ofs(ctdb->seqnum_frequency, 0),
445 ctdb_ltdb_seqnum_check, ctdb_db);
449 enable seqnum handling on this db
451 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
453 struct ctdb_db_context *ctdb_db;
454 ctdb_db = find_ctdb_db(ctdb, db_id);
456 DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n"));
460 if (ctdb_db->te == NULL) {
461 ctdb_db->te = event_add_timed(ctdb->ev, ctdb_db,
462 timeval_current_ofs(ctdb->seqnum_frequency, 0),
463 ctdb_ltdb_seqnum_check, ctdb_db);
466 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
467 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
472 enable seqnum handling on this db
474 int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency)
476 ctdb->seqnum_frequency = frequency;