2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
44 struct db_ctdb_transaction_handle *transaction;
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
60 enum TDB_ERROR tret = tdb_error(tdb);
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
84 struct ctdb_ltdb_header *header,
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
98 header->dmaster = (uint32_t)-1;
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
115 + sizeof(struct ctdb_ltdb_header),
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
124 status = NT_STATUS_OK;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
137 struct ctdb_ltdb_header *header,
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
172 struct ctdb_ltdb_header *header,
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
206 struct ctdb_ltdb_header *header,
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
268 r = (struct ctdb_rec_data *)&m->data[0];
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
303 * CTDB transaction destructor
305 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
307 tdb_transaction_cancel(h->ctx->wtdb->tdb);
312 * start a transaction on a ctdb database:
313 * - lock the transaction lock key
314 * - start the tdb transaction
316 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
318 struct db_record *rh;
321 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
323 struct db_ctdb_ctx *ctx = h->ctx;
326 struct ctdb_ltdb_header header;
328 key.dptr = (uint8_t *)discard_const(keyname);
329 key.dsize = strlen(keyname);
332 tmp_ctx = talloc_new(h);
334 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
336 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
337 talloc_free(tmp_ctx);
342 ret = tdb_transaction_start(ctx->wtdb->tdb);
344 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
345 talloc_free(tmp_ctx);
349 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
350 if (!NT_STATUS_IS_OK(status) || header.dmaster != get_my_vnn()) {
351 tdb_transaction_cancel(ctx->wtdb->tdb);
352 talloc_free(tmp_ctx);
356 talloc_free(tmp_ctx);
363 * CTDB dbwrap API: transaction_start function
364 * starts a transaction on a persistent database
366 static int db_ctdb_transaction_start(struct db_context *db)
368 struct db_ctdb_transaction_handle *h;
370 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
373 if (!db->persistent) {
374 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
379 if (ctx->transaction) {
380 ctx->transaction->nesting++;
384 h = talloc_zero(db, struct db_ctdb_transaction_handle);
386 DEBUG(0,(__location__ " oom for transaction handle\n"));
392 ret = db_ctdb_transaction_fetch_start(h);
398 talloc_set_destructor(h, db_ctdb_transaction_destructor);
400 ctx->transaction = h;
402 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
410 fetch a record inside a transaction
412 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
414 TDB_DATA key, TDB_DATA *data)
416 struct db_ctdb_transaction_handle *h = db->transaction;
419 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
421 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
423 } else if (!NT_STATUS_IS_OK(status)) {
428 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
429 if (h->m_all == NULL) {
430 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
432 talloc_free(data->dptr);
441 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
442 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
444 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
448 struct db_record *result;
451 if (!(result = talloc(mem_ctx, struct db_record))) {
452 DEBUG(0, ("talloc failed\n"));
456 result->private_data = ctx->transaction;
458 result->key.dsize = key.dsize;
459 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
460 if (result->key.dptr == NULL) {
461 DEBUG(0, ("talloc failed\n"));
466 result->store = db_ctdb_store_transaction;
467 result->delete_rec = db_ctdb_delete_transaction;
469 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
470 if (ctdb_data.dptr == NULL) {
471 /* create the record */
472 result->value = tdb_null;
476 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
477 result->value.dptr = NULL;
479 if ((result->value.dsize != 0)
480 && !(result->value.dptr = (uint8 *)talloc_memdup(
481 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
482 result->value.dsize))) {
483 DEBUG(0, ("talloc failed\n"));
487 SAFE_FREE(ctdb_data.dptr);
492 static int db_ctdb_record_destructor(struct db_record **recp)
494 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
495 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
496 rec->private_data, struct db_ctdb_transaction_handle);
497 int ret = h->ctx->db->transaction_commit(h->ctx->db);
499 DEBUG(0,(__location__ " transaction_commit failed\n"));
505 auto-create a transaction for persistent databases
507 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
512 struct db_record *rec, **recp;
514 res = db_ctdb_transaction_start(ctx->db);
519 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
521 ctx->db->transaction_cancel(ctx->db);
525 /* destroy this transaction when we release the lock */
526 recp = talloc(rec, struct db_record *);
528 ctx->db->transaction_cancel(ctx->db);
533 talloc_set_destructor(recp, db_ctdb_record_destructor);
539 stores a record inside a transaction
541 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
542 TDB_DATA key, TDB_DATA data)
544 TALLOC_CTX *tmp_ctx = talloc_new(h);
547 struct ctdb_ltdb_header header;
550 /* we need the header so we can update the RSN */
551 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
552 if (rec.dptr == NULL) {
553 /* the record doesn't exist - create one with us as dmaster.
554 This is only safe because we are in a transaction and this
555 is a persistent database */
558 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
559 rec.dsize -= sizeof(struct ctdb_ltdb_header);
560 /* a special case, we are writing the same data that is there now */
561 if (data.dsize == rec.dsize &&
562 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
564 talloc_free(tmp_ctx);
570 header.dmaster = get_my_vnn();
574 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
575 if (h->m_all == NULL) {
576 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
577 talloc_free(tmp_ctx);
582 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
583 if (h->m_write == NULL) {
584 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
585 talloc_free(tmp_ctx);
589 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
590 if (NT_STATUS_IS_OK(status)) {
596 talloc_free(tmp_ctx);
603 a record store inside a transaction
605 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
607 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
608 rec->private_data, struct db_ctdb_transaction_handle);
611 ret = db_ctdb_transaction_store(h, rec->key, data);
613 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
619 a record delete inside a transaction
621 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
623 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
624 rec->private_data, struct db_ctdb_transaction_handle);
627 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
629 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
638 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
641 struct ctdb_rec_data *rec = NULL;
644 talloc_free(h->m_write);
647 ret = db_ctdb_transaction_fetch_start(h);
652 for (i=0;i<h->m_all->count;i++) {
655 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
657 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
661 if (rec->reqid == 0) {
663 if (db_ctdb_transaction_store(h, key, data) != 0) {
668 TALLOC_CTX *tmp_ctx = talloc_new(h);
670 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
671 talloc_free(tmp_ctx);
674 if (data2.dsize != data.dsize ||
675 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
676 /* the record has changed on us - we have to give up */
677 talloc_free(tmp_ctx);
680 talloc_free(tmp_ctx);
687 tdb_transaction_cancel(h->ctx->wtdb->tdb);
695 static int db_ctdb_transaction_commit(struct db_context *db)
697 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
703 struct db_ctdb_transaction_handle *h = ctx->transaction;
704 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
707 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
711 if (h->nested_cancel) {
712 db->transaction_cancel(db);
713 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
717 if (h->nesting != 0) {
722 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
724 talloc_set_destructor(h, NULL);
726 /* our commit strategy is quite complex.
728 - we first try to commit the changes to all other nodes
730 - if that works, then we commit locally and we are done
732 - if a commit on another node fails, then we need to cancel
733 the transaction, then restart the transaction (thus
734 opening a window of time for a pending recovery to
735 complete), then replay the transaction, checking all the
736 reads and writes (checking that reads give the same data,
737 and writes succeed). Then we retry the transaction to the
742 if (h->m_write == NULL) {
743 /* no changes were made, potentially after a retry */
744 tdb_transaction_cancel(h->ctx->wtdb->tdb);
746 ctx->transaction = NULL;
750 /* tell ctdbd to commit to the other nodes */
751 rets = ctdbd_control_local(messaging_ctdbd_connection(),
752 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
754 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
755 if (!NT_STATUS_IS_OK(rets) || status != 0) {
756 tdb_transaction_cancel(h->ctx->wtdb->tdb);
759 if (!NT_STATUS_IS_OK(rets)) {
760 failure_control = CTDB_CONTROL_TRANS2_ERROR;
762 /* work out what error code we will give if we
763 have to fail the operation */
764 switch ((enum ctdb_trans2_commit_error)status) {
765 case CTDB_TRANS2_COMMIT_SUCCESS:
766 case CTDB_TRANS2_COMMIT_SOMEFAIL:
767 case CTDB_TRANS2_COMMIT_TIMEOUT:
768 failure_control = CTDB_CONTROL_TRANS2_ERROR;
770 case CTDB_TRANS2_COMMIT_ALLFAIL:
771 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
776 if (++retries == 5) {
777 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
778 h->ctx->db_id, retries, (unsigned)failure_control));
779 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
780 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
781 tdb_null, NULL, NULL, NULL);
782 h->ctx->transaction = NULL;
784 ctx->transaction = NULL;
788 if (ctdb_replay_transaction(h) != 0) {
789 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
790 (unsigned)failure_control));
791 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
792 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
793 tdb_null, NULL, NULL, NULL);
794 h->ctx->transaction = NULL;
796 ctx->transaction = NULL;
801 failure_control = CTDB_CONTROL_TRANS2_ERROR;
804 /* do the real commit locally */
805 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
807 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
808 (unsigned)failure_control));
809 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
810 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
811 h->ctx->transaction = NULL;
816 /* tell ctdbd that we are finished with our local commit */
817 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
818 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
819 tdb_null, NULL, NULL, NULL);
820 h->ctx->transaction = NULL;
829 static int db_ctdb_transaction_cancel(struct db_context *db)
831 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
833 struct db_ctdb_transaction_handle *h = ctx->transaction;
836 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
840 if (h->nesting != 0) {
842 h->nested_cancel = true;
846 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
848 ctx->transaction = NULL;
854 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
856 struct db_ctdb_rec *crec = talloc_get_type_abort(
857 rec->private_data, struct db_ctdb_rec);
859 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
864 static NTSTATUS db_ctdb_delete(struct db_record *rec)
869 * We have to store the header with empty data. TODO: Fix the
875 return db_ctdb_store(rec, data, 0);
879 static int db_ctdb_record_destr(struct db_record* data)
881 struct db_ctdb_rec *crec = talloc_get_type_abort(
882 data->private_data, struct db_ctdb_rec);
884 DEBUG(10, (DEBUGLEVEL > 10
885 ? "Unlocking db %u key %s\n"
886 : "Unlocking db %u key %.20s\n",
887 (int)crec->ctdb_ctx->db_id,
888 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
891 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
892 DEBUG(0, ("tdb_chainunlock failed\n"));
899 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
904 struct db_record *result;
905 struct db_ctdb_rec *crec;
908 int migrate_attempts = 0;
910 if (!(result = talloc(mem_ctx, struct db_record))) {
911 DEBUG(0, ("talloc failed\n"));
915 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
916 DEBUG(0, ("talloc failed\n"));
921 result->private_data = (void *)crec;
922 crec->ctdb_ctx = ctx;
924 result->key.dsize = key.dsize;
925 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
926 if (result->key.dptr == NULL) {
927 DEBUG(0, ("talloc failed\n"));
933 * Do a blocking lock on the record
937 if (DEBUGLEVEL >= 10) {
938 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
939 DEBUG(10, (DEBUGLEVEL > 10
940 ? "Locking db %u key %s\n"
941 : "Locking db %u key %.20s\n",
942 (int)crec->ctdb_ctx->db_id, keystr));
946 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
947 DEBUG(3, ("tdb_chainlock failed\n"));
952 result->store = db_ctdb_store;
953 result->delete_rec = db_ctdb_delete;
954 talloc_set_destructor(result, db_ctdb_record_destr);
956 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
959 * See if we have a valid record and we are the dmaster. If so, we can
960 * take the shortcut and just return it.
963 if ((ctdb_data.dptr == NULL) ||
964 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
965 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
967 || (random() % 2 != 0)
970 SAFE_FREE(ctdb_data.dptr);
971 tdb_chainunlock(ctx->wtdb->tdb, key);
972 talloc_set_destructor(result, NULL);
974 migrate_attempts += 1;
976 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
977 ctdb_data.dptr, ctdb_data.dptr ?
978 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
981 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
982 if (!NT_STATUS_IS_OK(status)) {
983 DEBUG(5, ("ctdb_migrate failed: %s\n",
988 /* now its migrated, try again */
992 if (migrate_attempts > 10) {
993 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
997 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
999 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1000 result->value.dptr = NULL;
1002 if ((result->value.dsize != 0)
1003 && !(result->value.dptr = (uint8 *)talloc_memdup(
1004 result, ctdb_data.dptr + sizeof(crec->header),
1005 result->value.dsize))) {
1006 DEBUG(0, ("talloc failed\n"));
1007 TALLOC_FREE(result);
1010 SAFE_FREE(ctdb_data.dptr);
1015 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1016 TALLOC_CTX *mem_ctx,
1019 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1020 struct db_ctdb_ctx);
1022 if (ctx->transaction != NULL) {
1023 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1026 if (db->persistent) {
1027 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1030 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1034 fetch (unlocked, no migration) operation on ctdb
1036 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1037 TDB_DATA key, TDB_DATA *data)
1039 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1040 struct db_ctdb_ctx);
1044 if (ctx->transaction) {
1045 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1048 /* try a direct fetch */
1049 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1052 * See if we have a valid record and we are the dmaster. If so, we can
1053 * take the shortcut and just return it.
1054 * we bypass the dmaster check for persistent databases
1056 if ((ctdb_data.dptr != NULL) &&
1057 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1059 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1060 /* we are the dmaster - avoid the ctdb protocol op */
1062 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1063 if (data->dsize == 0) {
1064 SAFE_FREE(ctdb_data.dptr);
1069 data->dptr = (uint8 *)talloc_memdup(
1070 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1073 SAFE_FREE(ctdb_data.dptr);
1075 if (data->dptr == NULL) {
1081 SAFE_FREE(ctdb_data.dptr);
1083 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1084 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1085 if (!NT_STATUS_IS_OK(status)) {
1086 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1093 struct traverse_state {
1094 struct db_context *db;
1095 int (*fn)(struct db_record *rec, void *private_data);
1099 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1101 struct traverse_state *state = (struct traverse_state *)private_data;
1102 struct db_record *rec;
1103 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1104 /* we have to give them a locked record to prevent races */
1105 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1106 if (rec && rec->value.dsize > 0) {
1107 state->fn(rec, state->private_data);
1109 talloc_free(tmp_ctx);
1112 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1115 struct traverse_state *state = (struct traverse_state *)private_data;
1116 struct db_record *rec;
1117 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1119 /* we have to give them a locked record to prevent races */
1120 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1121 if (rec && rec->value.dsize > 0) {
1122 ret = state->fn(rec, state->private_data);
1124 talloc_free(tmp_ctx);
1128 static int db_ctdb_traverse(struct db_context *db,
1129 int (*fn)(struct db_record *rec,
1130 void *private_data),
1133 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1134 struct db_ctdb_ctx);
1135 struct traverse_state state;
1139 state.private_data = private_data;
1141 if (db->persistent) {
1142 /* for persistent databases we don't need to do a ctdb traverse,
1143 we can do a faster local traverse */
1144 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1148 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1152 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1154 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1157 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1159 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1162 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1164 struct traverse_state *state = (struct traverse_state *)private_data;
1165 struct db_record rec;
1168 rec.store = db_ctdb_store_deny;
1169 rec.delete_rec = db_ctdb_delete_deny;
1170 rec.private_data = state->db;
1171 state->fn(&rec, state->private_data);
1174 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1177 struct traverse_state *state = (struct traverse_state *)private_data;
1178 struct db_record rec;
1181 rec.store = db_ctdb_store_deny;
1182 rec.delete_rec = db_ctdb_delete_deny;
1183 rec.private_data = state->db;
1185 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1186 /* a deleted record */
1189 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1190 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1192 return state->fn(&rec, state->private_data);
1195 static int db_ctdb_traverse_read(struct db_context *db,
1196 int (*fn)(struct db_record *rec,
1197 void *private_data),
1200 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1201 struct db_ctdb_ctx);
1202 struct traverse_state state;
1206 state.private_data = private_data;
1208 if (db->persistent) {
1209 /* for persistent databases we don't need to do a ctdb traverse,
1210 we can do a faster local traverse */
1211 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1214 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1218 static int db_ctdb_get_seqnum(struct db_context *db)
1220 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1221 struct db_ctdb_ctx);
1222 return tdb_get_seqnum(ctx->wtdb->tdb);
1225 static int db_ctdb_get_flags(struct db_context *db)
1227 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1228 struct db_ctdb_ctx);
1229 return tdb_get_flags(ctx->wtdb->tdb);
1232 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1234 int hash_size, int tdb_flags,
1235 int open_flags, mode_t mode)
1237 struct db_context *result;
1238 struct db_ctdb_ctx *db_ctdb;
1241 if (!lp_clustering()) {
1242 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1246 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1247 DEBUG(0, ("talloc failed\n"));
1248 TALLOC_FREE(result);
1252 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1253 DEBUG(0, ("talloc failed\n"));
1254 TALLOC_FREE(result);
1258 db_ctdb->transaction = NULL;
1259 db_ctdb->db = result;
1261 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1262 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1263 TALLOC_FREE(result);
1267 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1269 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1271 /* only pass through specific flags */
1272 tdb_flags &= TDB_SEQNUM;
1274 /* honor permissions if user has specified O_CREAT */
1275 if (open_flags & O_CREAT) {
1276 chmod(db_path, mode);
1279 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1280 if (db_ctdb->wtdb == NULL) {
1281 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1282 TALLOC_FREE(result);
1285 talloc_free(db_path);
1287 result->private_data = (void *)db_ctdb;
1288 result->fetch_locked = db_ctdb_fetch_locked;
1289 result->fetch = db_ctdb_fetch;
1290 result->traverse = db_ctdb_traverse;
1291 result->traverse_read = db_ctdb_traverse_read;
1292 result->get_seqnum = db_ctdb_get_seqnum;
1293 result->get_flags = db_ctdb_get_flags;
1294 result->transaction_start = db_ctdb_transaction_start;
1295 result->transaction_commit = db_ctdb_transaction_commit;
1296 result->transaction_cancel = db_ctdb_transaction_cancel;
1298 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1299 name, db_ctdb->db_id));