2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
37 struct tdb_wrap *wtdb;
39 struct db_ctdb_transaction_handle *transaction;
43 struct db_ctdb_ctx *ctdb_ctx;
44 struct ctdb_ltdb_header header;
47 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
52 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
55 enum TDB_ERROR tret = tdb_error(tdb);
59 status = NT_STATUS_OBJECT_NAME_COLLISION;
62 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
65 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
75 form a ctdb_rec_data record from a key/data pair
77 note that header may be NULL. If not NULL then it is included in the data portion
80 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
82 struct ctdb_ltdb_header *header,
86 struct ctdb_rec_data *d;
88 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
89 data.dsize + (header?sizeof(*header):0);
90 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
96 d->keylen = key.dsize;
97 memcpy(&d->data[0], key.dptr, key.dsize);
99 d->datalen = data.dsize + sizeof(*header);
100 memcpy(&d->data[key.dsize], header, sizeof(*header));
101 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
103 d->datalen = data.dsize;
104 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
110 /* helper function for marshalling multiple records */
111 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
112 struct ctdb_marshall_buffer *m,
116 struct ctdb_ltdb_header *header,
119 struct ctdb_rec_data *r;
120 size_t m_size, r_size;
121 struct ctdb_marshall_buffer *m2;
123 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
130 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
137 m_size = talloc_get_size(m);
138 r_size = talloc_get_size(r);
140 m2 = talloc_realloc_size(mem_ctx, m, m_size + r_size);
146 memcpy(m_size + (uint8_t *)m2, r, r_size);
155 /* we've finished marshalling, return a data blob with the marshalled records */
156 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
159 data.dptr = (uint8_t *)m;
160 data.dsize = talloc_get_size(m);
165 loop over a marshalling buffer
167 - pass r==NULL to start
168 - loop the number of times indicated by m->count
170 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA *key, TDB_DATA *data)
176 r = (struct ctdb_rec_data *)&m->data[0];
178 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
186 key->dptr = &r->data[0];
187 key->dsize = r->keylen;
190 data->dptr = &r->data[r->keylen];
191 data->dsize = r->datalen;
192 if (header != NULL) {
193 data->dptr += sizeof(*header);
194 data->dsize -= sizeof(*header);
198 if (header != NULL) {
199 if (r->datalen < sizeof(*header)) {
202 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
210 /* start a transaction on a database */
211 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
213 tdb_transaction_cancel(h->ctx->wtdb->tdb);
217 /* start a transaction on a database */
218 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
220 struct db_record *rh;
223 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
225 struct db_ctdb_ctx *ctx = h->ctx;
228 key.dptr = discard_const(keyname);
229 key.dsize = strlen(keyname);
232 tmp_ctx = talloc_new(h);
234 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
236 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
237 talloc_free(tmp_ctx);
242 ret = tdb_transaction_start(ctx->wtdb->tdb);
244 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
245 talloc_free(tmp_ctx);
249 data = tdb_fetch(ctx->wtdb->tdb, key);
250 if ((data.dptr == NULL) ||
251 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
252 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
253 SAFE_FREE(data.dptr);
254 tdb_transaction_cancel(ctx->wtdb->tdb);
255 talloc_free(tmp_ctx);
259 SAFE_FREE(data.dptr);
260 talloc_free(tmp_ctx);
266 /* start a transaction on a database */
267 static int db_ctdb_transaction_start(struct db_context *db)
269 struct db_ctdb_transaction_handle *h;
271 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
274 if (!db->persistent) {
275 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
280 if (ctx->transaction) {
281 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
285 h = talloc_zero(db, struct db_ctdb_transaction_handle);
287 DEBUG(0,(__location__ " oom for transaction handle\n"));
293 ret = db_ctdb_transaction_fetch_start(h);
299 talloc_set_destructor(h, db_ctdb_transaction_destructor);
301 ctx->transaction = h;
303 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
311 fetch a record inside a transaction
313 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
315 TDB_DATA key, TDB_DATA *data)
317 struct db_ctdb_transaction_handle *h = db->transaction;
319 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
321 if (data->dptr != NULL) {
322 uint8_t *oldptr = (uint8_t *)data->dptr;
323 data->dsize -= sizeof(struct ctdb_ltdb_header);
324 if (data->dsize == 0) {
327 data->dptr = (uint8 *)
329 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
333 if (data->dptr == NULL && data->dsize != 0) {
339 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
340 if (h->m_all == NULL) {
341 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
343 talloc_free(data->dptr);
352 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
353 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
355 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
359 struct db_record *result;
362 if (!(result = talloc(mem_ctx, struct db_record))) {
363 DEBUG(0, ("talloc failed\n"));
367 result->private_data = ctx->transaction;
369 result->key.dsize = key.dsize;
370 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
371 if (result->key.dptr == NULL) {
372 DEBUG(0, ("talloc failed\n"));
377 result->store = db_ctdb_store_transaction;
378 result->delete_rec = db_ctdb_delete_transaction;
380 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
381 if (ctdb_data.dptr == NULL) {
382 /* create the record */
383 result->value = tdb_null;
387 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
388 result->value.dptr = NULL;
390 if ((result->value.dsize != 0)
391 && !(result->value.dptr = (uint8 *)talloc_memdup(
392 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
393 result->value.dsize))) {
394 DEBUG(0, ("talloc failed\n"));
398 SAFE_FREE(ctdb_data.dptr);
405 stores a record inside a transaction
407 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
408 TDB_DATA key, TDB_DATA data)
410 TALLOC_CTX *tmp_ctx = talloc_new(h);
413 struct ctdb_ltdb_header header;
415 /* we need the header so we can update the RSN */
416 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
417 if (rec.dptr == NULL) {
418 /* the record doesn't exist - create one with us as dmaster.
419 This is only safe because we are in a transaction and this
420 is a persistent database */
422 header.dmaster = get_my_vnn();
424 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
431 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
432 if (h->m_all == NULL) {
433 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
434 talloc_free(tmp_ctx);
438 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
439 if (h->m_write == NULL) {
440 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
441 talloc_free(tmp_ctx);
446 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
447 rec.dptr = talloc_size(tmp_ctx, rec.dsize);
448 if (rec.dptr == NULL) {
449 DEBUG(0,(__location__ " Failed to alloc record\n"));
450 talloc_free(tmp_ctx);
453 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
454 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
456 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
458 talloc_free(tmp_ctx);
465 a record store inside a transaction
467 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
469 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
470 rec->private_data, struct db_ctdb_transaction_handle);
473 ret = db_ctdb_transaction_store(h, rec->key, data);
475 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
481 a record delete inside a transaction
483 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
485 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
486 rec->private_data, struct db_ctdb_transaction_handle);
489 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
491 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
500 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
503 struct ctdb_rec_data *rec = NULL;
507 ret = db_ctdb_transaction_fetch_start(h);
512 for (i=0;i<h->m_all->count;i++) {
515 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
517 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
521 if (rec->reqid == 0) {
523 if (db_ctdb_transaction_store(h, key, data) != 0) {
528 TALLOC_CTX *tmp_ctx = talloc_new(h);
530 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
531 talloc_free(tmp_ctx);
534 if (data2.dsize != data.dsize ||
535 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
536 /* the record has changed on us - we have to give up */
537 talloc_free(tmp_ctx);
540 talloc_free(tmp_ctx);
547 tdb_transaction_cancel(h->ctx->wtdb->tdb);
555 static int db_ctdb_transaction_commit(struct db_context *db)
557 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
562 struct db_ctdb_transaction_handle *h = ctx->transaction;
565 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
569 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
571 if (h->m_write == NULL) {
572 /* no changes were made */
574 ctx->transaction = NULL;
578 talloc_set_destructor(h, NULL);
580 /* our commit strategy is quite complex.
582 - we first try to commit the changes to all other nodes
584 - if that works, then we commit locally and we are done
586 - if a commit on another node fails, then we need to cancel
587 the transaction, then restart the transaction (thus
588 opening a window of time for a pending recovery to
589 complete), then replay the transaction, checking all the
590 reads and writes (checking that reads give the same data,
591 and writes succeed). Then we retry the transaction to the
596 /* tell ctdbd to commit to the other nodes */
597 rets = ctdbd_control_local(messaging_ctdbd_connection(),
598 CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
599 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
600 if (!NT_STATUS_IS_OK(rets) || status != 0) {
601 tdb_transaction_cancel(h->ctx->wtdb->tdb);
603 if (ctdb_replay_transaction(h) != 0) {
604 DEBUG(0,(__location__ " Failed to replay transaction\n"));
605 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
606 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
607 tdb_null, NULL, NULL, NULL);
608 h->ctx->transaction = NULL;
610 ctx->transaction = NULL;
616 /* do the real commit locally */
617 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
619 DEBUG(0,(__location__ " Failed to commit transaction\n"));
620 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id,
621 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
622 h->ctx->transaction = NULL;
627 /* tell ctdbd that we are finished with our local commit */
628 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
629 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
630 tdb_null, NULL, NULL, NULL);
631 h->ctx->transaction = NULL;
640 static int db_ctdb_transaction_cancel(struct db_context *db)
642 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
644 struct db_ctdb_transaction_handle *h = ctx->transaction;
647 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
651 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
653 ctx->transaction = NULL;
659 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
661 struct db_ctdb_rec *crec = talloc_get_type_abort(
662 rec->private_data, struct db_ctdb_rec);
666 cdata.dsize = sizeof(crec->header) + data.dsize;
668 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
669 return NT_STATUS_NO_MEMORY;
672 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
673 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
675 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
677 SAFE_FREE(cdata.dptr);
679 return (ret == 0) ? NT_STATUS_OK
680 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
684 /* for persistent databases the store is a bit different. We have to
685 ask the ctdb daemon to push the record to all nodes after the
687 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
689 struct db_ctdb_rec *crec;
690 struct db_record *record;
695 int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
697 for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
698 (count < max_retries) && !NT_STATUS_IS_OK(status);
704 * There is a hack here: We use rec as a memory
705 * context and re-use it as the record struct ptr.
706 * We don't free the record data allocated
707 * in each turn. So all gets freed when the caller
708 * releases the original record. This is because
709 * we don't get the record passed in by reference
710 * in the first place and the caller relies on
711 * having to free the record himself.
713 record = fetch_locked_internal(crec->ctdb_ctx,
716 true /* persistent */);
717 if (record == NULL) {
718 DEBUG(5, ("fetch_locked_internal failed.\n"));
719 status = NT_STATUS_NO_MEMORY;
724 crec = talloc_get_type_abort(record->private_data,
727 cdata.dsize = sizeof(crec->header) + data.dsize;
729 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
730 return NT_STATUS_NO_MEMORY;
735 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
736 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
738 status = ctdbd_start_persistent_update(
739 messaging_ctdbd_connection(),
740 crec->ctdb_ctx->db_id,
744 if (NT_STATUS_IS_OK(status)) {
745 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
749 : tdb_error_to_ntstatus(
750 crec->ctdb_ctx->wtdb->tdb);
754 * release the lock *now* in order to prevent deadlocks.
756 * There is a tradeoff: Usually, the record is still locked
757 * after db->store operation. This lock is usually released
758 * via the talloc destructor with the TALLOC_FREE to
759 * the record. So we have two choices:
761 * - Either re-lock the record after the call to persistent_store
762 * or cancel_persistent update and this way not changing any
763 * assumptions callers may have about the state, but possibly
764 * introducing new race conditions.
766 * - Or don't lock the record again but just remove the
767 * talloc_destructor. This is less racy but assumes that
768 * the lock is always released via TALLOC_FREE of the record.
770 * I choose the first variant for now since it seems less racy.
771 * We can't guarantee that we succeed in getting the lock
772 * anyways. The only real danger here is that a caller
773 * performs multiple store operations after a fetch_locked()
774 * which is currently not the case.
776 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
777 talloc_set_destructor(record, NULL);
779 /* now tell ctdbd to update this record on all other nodes */
780 if (NT_STATUS_IS_OK(status)) {
781 status = ctdbd_persistent_store(
782 messaging_ctdbd_connection(),
783 crec->ctdb_ctx->db_id,
787 ctdbd_cancel_persistent_update(
788 messaging_ctdbd_connection(),
789 crec->ctdb_ctx->db_id,
795 SAFE_FREE(cdata.dptr);
798 if (!NT_STATUS_IS_OK(status)) {
799 DEBUG(5, ("ctdbd_persistent_store failed after "
800 "%d retries with error %s - giving up.\n",
801 count, nt_errstr(status)));
804 SAFE_FREE(cdata.dptr);
809 static NTSTATUS db_ctdb_delete(struct db_record *rec)
814 * We have to store the header with empty data. TODO: Fix the
820 return db_ctdb_store(rec, data, 0);
824 static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
829 * We have to store the header with empty data. TODO: Fix the
835 return db_ctdb_store_persistent(rec, data, 0);
839 static int db_ctdb_record_destr(struct db_record* data)
841 struct db_ctdb_rec *crec = talloc_get_type_abort(
842 data->private_data, struct db_ctdb_rec);
844 DEBUG(10, (DEBUGLEVEL > 10
845 ? "Unlocking db %u key %s\n"
846 : "Unlocking db %u key %.20s\n",
847 (int)crec->ctdb_ctx->db_id,
848 hex_encode(data, (unsigned char *)data->key.dptr,
851 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
852 DEBUG(0, ("tdb_chainunlock failed\n"));
859 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
864 struct db_record *result;
865 struct db_ctdb_rec *crec;
868 int migrate_attempts = 0;
870 if (ctx->transaction != NULL) {
871 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
874 if (!(result = talloc(mem_ctx, struct db_record))) {
875 DEBUG(0, ("talloc failed\n"));
879 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
880 DEBUG(0, ("talloc failed\n"));
885 result->private_data = (void *)crec;
886 crec->ctdb_ctx = ctx;
888 result->key.dsize = key.dsize;
889 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
890 if (result->key.dptr == NULL) {
891 DEBUG(0, ("talloc failed\n"));
897 * Do a blocking lock on the record
901 if (DEBUGLEVEL >= 10) {
902 char *keystr = hex_encode(result, key.dptr, key.dsize);
903 DEBUG(10, (DEBUGLEVEL > 10
904 ? "Locking db %u key %s\n"
905 : "Locking db %u key %.20s\n",
906 (int)crec->ctdb_ctx->db_id, keystr));
910 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
911 DEBUG(3, ("tdb_chainlock failed\n"));
917 result->store = db_ctdb_store_persistent;
918 result->delete_rec = db_ctdb_delete_persistent;
920 result->store = db_ctdb_store;
921 result->delete_rec = db_ctdb_delete;
923 talloc_set_destructor(result, db_ctdb_record_destr);
925 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
928 * See if we have a valid record and we are the dmaster. If so, we can
929 * take the shortcut and just return it.
932 if ((ctdb_data.dptr == NULL) ||
933 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
934 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
936 || (random() % 2 != 0)
939 SAFE_FREE(ctdb_data.dptr);
940 tdb_chainunlock(ctx->wtdb->tdb, key);
941 talloc_set_destructor(result, NULL);
943 migrate_attempts += 1;
945 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
946 ctdb_data.dptr, ctdb_data.dptr ?
947 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
950 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
951 if (!NT_STATUS_IS_OK(status)) {
952 DEBUG(5, ("ctdb_migrate failed: %s\n",
957 /* now its migrated, try again */
961 if (migrate_attempts > 10) {
962 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
966 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
968 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
969 result->value.dptr = NULL;
971 if ((result->value.dsize != 0)
972 && !(result->value.dptr = (uint8 *)talloc_memdup(
973 result, ctdb_data.dptr + sizeof(crec->header),
974 result->value.dsize))) {
975 DEBUG(0, ("talloc failed\n"));
979 SAFE_FREE(ctdb_data.dptr);
984 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
988 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
991 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
995 fetch (unlocked, no migration) operation on ctdb
997 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
998 TDB_DATA key, TDB_DATA *data)
1000 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1001 struct db_ctdb_ctx);
1005 if (ctx->transaction) {
1006 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1009 /* try a direct fetch */
1010 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1013 * See if we have a valid record and we are the dmaster. If so, we can
1014 * take the shortcut and just return it.
1015 * we bypass the dmaster check for persistent databases
1017 if ((ctdb_data.dptr != NULL) &&
1018 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1020 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1021 /* we are the dmaster - avoid the ctdb protocol op */
1023 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1024 if (data->dsize == 0) {
1025 SAFE_FREE(ctdb_data.dptr);
1030 data->dptr = (uint8 *)talloc_memdup(
1031 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1034 SAFE_FREE(ctdb_data.dptr);
1036 if (data->dptr == NULL) {
1042 SAFE_FREE(ctdb_data.dptr);
1044 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1045 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1046 if (!NT_STATUS_IS_OK(status)) {
1047 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1054 struct traverse_state {
1055 struct db_context *db;
1056 int (*fn)(struct db_record *rec, void *private_data);
1060 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1062 struct traverse_state *state = (struct traverse_state *)private_data;
1063 struct db_record *rec;
1064 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1065 /* we have to give them a locked record to prevent races */
1066 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1067 if (rec && rec->value.dsize > 0) {
1068 state->fn(rec, state->private_data);
1070 talloc_free(tmp_ctx);
1073 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1076 struct traverse_state *state = (struct traverse_state *)private_data;
1077 struct db_record *rec;
1078 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1080 /* we have to give them a locked record to prevent races */
1081 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1082 if (rec && rec->value.dsize > 0) {
1083 ret = state->fn(rec, state->private_data);
1085 talloc_free(tmp_ctx);
1089 static int db_ctdb_traverse(struct db_context *db,
1090 int (*fn)(struct db_record *rec,
1091 void *private_data),
1094 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1095 struct db_ctdb_ctx);
1096 struct traverse_state state;
1100 state.private_data = private_data;
1102 if (db->persistent) {
1103 /* for persistent databases we don't need to do a ctdb traverse,
1104 we can do a faster local traverse */
1105 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1109 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1113 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1115 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1118 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1120 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1123 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1125 struct traverse_state *state = (struct traverse_state *)private_data;
1126 struct db_record rec;
1129 rec.store = db_ctdb_store_deny;
1130 rec.delete_rec = db_ctdb_delete_deny;
1131 rec.private_data = state->db;
1132 state->fn(&rec, state->private_data);
1135 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1138 struct traverse_state *state = (struct traverse_state *)private_data;
1139 struct db_record rec;
1142 rec.store = db_ctdb_store_deny;
1143 rec.delete_rec = db_ctdb_delete_deny;
1144 rec.private_data = state->db;
1146 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1147 /* a deleted record */
1150 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1151 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1153 return state->fn(&rec, state->private_data);
1156 static int db_ctdb_traverse_read(struct db_context *db,
1157 int (*fn)(struct db_record *rec,
1158 void *private_data),
1161 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1162 struct db_ctdb_ctx);
1163 struct traverse_state state;
1167 state.private_data = private_data;
1169 if (db->persistent) {
1170 /* for persistent databases we don't need to do a ctdb traverse,
1171 we can do a faster local traverse */
1172 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1175 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1179 static int db_ctdb_get_seqnum(struct db_context *db)
1181 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1182 struct db_ctdb_ctx);
1183 return tdb_get_seqnum(ctx->wtdb->tdb);
1186 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1188 int hash_size, int tdb_flags,
1189 int open_flags, mode_t mode)
1191 struct db_context *result;
1192 struct db_ctdb_ctx *db_ctdb;
1195 if (!lp_clustering()) {
1196 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1200 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1201 DEBUG(0, ("talloc failed\n"));
1202 TALLOC_FREE(result);
1206 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1207 DEBUG(0, ("talloc failed\n"));
1208 TALLOC_FREE(result);
1212 db_ctdb->transaction = NULL;
1214 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1215 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1216 TALLOC_FREE(result);
1220 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1222 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1224 /* only pass through specific flags */
1225 tdb_flags &= TDB_SEQNUM;
1227 /* honor permissions if user has specified O_CREAT */
1228 if (open_flags & O_CREAT) {
1229 chmod(db_path, mode);
1232 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1233 if (db_ctdb->wtdb == NULL) {
1234 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1235 TALLOC_FREE(result);
1238 talloc_free(db_path);
1240 result->private_data = (void *)db_ctdb;
1241 result->fetch_locked = db_ctdb_fetch_locked;
1242 result->fetch = db_ctdb_fetch;
1243 result->traverse = db_ctdb_traverse;
1244 result->traverse_read = db_ctdb_traverse_read;
1245 result->get_seqnum = db_ctdb_get_seqnum;
1246 result->transaction_start = db_ctdb_transaction_start;
1247 result->transaction_commit = db_ctdb_transaction_commit;
1248 result->transaction_cancel = db_ctdb_transaction_cancel;
1250 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1251 name, db_ctdb->db_id));