2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
37 struct db_context *db;
38 struct tdb_wrap *wtdb;
40 struct db_ctdb_transaction_handle *transaction;
44 struct db_ctdb_ctx *ctdb_ctx;
45 struct ctdb_ltdb_header header;
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 enum TDB_ERROR tret = tdb_error(tdb);
60 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
76 form a ctdb_rec_data record from a key/data pair
78 note that header may be NULL. If not NULL then it is included in the data portion
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
83 struct ctdb_ltdb_header *header,
87 struct ctdb_rec_data *d;
89 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
90 data.dsize + (header?sizeof(*header):0);
91 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
97 d->keylen = key.dsize;
98 memcpy(&d->data[0], key.dptr, key.dsize);
100 d->datalen = data.dsize + sizeof(*header);
101 memcpy(&d->data[key.dsize], header, sizeof(*header));
102 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
104 d->datalen = data.dsize;
105 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
113 struct ctdb_marshall_buffer *m,
117 struct ctdb_ltdb_header *header,
120 struct ctdb_rec_data *r;
121 size_t m_size, r_size;
122 struct ctdb_marshall_buffer *m2;
124 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
131 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
138 m_size = talloc_get_size(m);
139 r_size = talloc_get_size(r);
141 m2 = talloc_realloc_size(mem_ctx, m, m_size + r_size);
147 memcpy(m_size + (uint8_t *)m2, r, r_size);
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
160 data.dptr = (uint8_t *)m;
161 data.dsize = talloc_get_size(m);
166 loop over a marshalling buffer
168 - pass r==NULL to start
169 - loop the number of times indicated by m->count
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
173 struct ctdb_ltdb_header *header,
174 TDB_DATA *key, TDB_DATA *data)
177 r = (struct ctdb_rec_data *)&m->data[0];
179 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
187 key->dptr = &r->data[0];
188 key->dsize = r->keylen;
191 data->dptr = &r->data[r->keylen];
192 data->dsize = r->datalen;
193 if (header != NULL) {
194 data->dptr += sizeof(*header);
195 data->dsize -= sizeof(*header);
199 if (header != NULL) {
200 if (r->datalen < sizeof(*header)) {
203 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
214 tdb_transaction_cancel(h->ctx->wtdb->tdb);
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
221 struct db_record *rh;
224 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
226 struct db_ctdb_ctx *ctx = h->ctx;
229 key.dptr = discard_const(keyname);
230 key.dsize = strlen(keyname);
233 tmp_ctx = talloc_new(h);
235 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
237 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
238 talloc_free(tmp_ctx);
243 ret = tdb_transaction_start(ctx->wtdb->tdb);
245 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246 talloc_free(tmp_ctx);
250 data = tdb_fetch(ctx->wtdb->tdb, key);
251 if ((data.dptr == NULL) ||
252 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254 SAFE_FREE(data.dptr);
255 tdb_transaction_cancel(ctx->wtdb->tdb);
256 talloc_free(tmp_ctx);
260 SAFE_FREE(data.dptr);
261 talloc_free(tmp_ctx);
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
270 struct db_ctdb_transaction_handle *h;
272 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
275 if (!db->persistent) {
276 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
281 if (ctx->transaction) {
282 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
286 h = talloc_zero(db, struct db_ctdb_transaction_handle);
288 DEBUG(0,(__location__ " oom for transaction handle\n"));
294 ret = db_ctdb_transaction_fetch_start(h);
300 talloc_set_destructor(h, db_ctdb_transaction_destructor);
302 ctx->transaction = h;
304 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
312 fetch a record inside a transaction
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
316 TDB_DATA key, TDB_DATA *data)
318 struct db_ctdb_transaction_handle *h = db->transaction;
320 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
322 if (data->dptr != NULL) {
323 uint8_t *oldptr = (uint8_t *)data->dptr;
324 data->dsize -= sizeof(struct ctdb_ltdb_header);
325 if (data->dsize == 0) {
328 data->dptr = (uint8 *)
330 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
334 if (data->dptr == NULL && data->dsize != 0) {
340 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341 if (h->m_all == NULL) {
342 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
344 talloc_free(data->dptr);
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
360 struct db_record *result;
363 if (!(result = talloc(mem_ctx, struct db_record))) {
364 DEBUG(0, ("talloc failed\n"));
368 result->private_data = ctx->transaction;
370 result->key.dsize = key.dsize;
371 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372 if (result->key.dptr == NULL) {
373 DEBUG(0, ("talloc failed\n"));
378 result->store = db_ctdb_store_transaction;
379 result->delete_rec = db_ctdb_delete_transaction;
381 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382 if (ctdb_data.dptr == NULL) {
383 /* create the record */
384 result->value = tdb_null;
388 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389 result->value.dptr = NULL;
391 if ((result->value.dsize != 0)
392 && !(result->value.dptr = (uint8 *)talloc_memdup(
393 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394 result->value.dsize))) {
395 DEBUG(0, ("talloc failed\n"));
399 SAFE_FREE(ctdb_data.dptr);
404 static int db_ctdb_record_destructor(struct db_record *rec)
406 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407 rec->private_data, struct db_ctdb_transaction_handle);
408 int ret = h->ctx->db->transaction_commit(h->ctx->db);
410 DEBUG(0,(__location__ " transaction_commit failed\n"));
416 auto-create a transaction for persistent databases
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
423 struct db_record *rec;
425 res = db_ctdb_transaction_start(ctx->db);
430 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
432 ctx->db->transaction_cancel(ctx->db);
436 /* destroy this transaction when we release the lock */
437 talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
443 stores a record inside a transaction
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
446 TDB_DATA key, TDB_DATA data)
448 TALLOC_CTX *tmp_ctx = talloc_new(h);
451 struct ctdb_ltdb_header header;
453 /* we need the header so we can update the RSN */
454 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455 if (rec.dptr == NULL) {
456 /* the record doesn't exist - create one with us as dmaster.
457 This is only safe because we are in a transaction and this
458 is a persistent database */
460 header.dmaster = get_my_vnn();
462 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
463 rec.dsize -= sizeof(struct ctdb_ltdb_header);
464 /* a special case, we are writing the same data that is there now */
465 if (data.dsize == rec.dsize &&
466 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
468 talloc_free(tmp_ctx);
477 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
478 if (h->m_all == NULL) {
479 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
480 talloc_free(tmp_ctx);
485 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
486 if (h->m_write == NULL) {
487 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
488 talloc_free(tmp_ctx);
492 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
493 rec.dptr = talloc_size(tmp_ctx, rec.dsize);
494 if (rec.dptr == NULL) {
495 DEBUG(0,(__location__ " Failed to alloc record\n"));
496 talloc_free(tmp_ctx);
499 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
500 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
502 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
504 talloc_free(tmp_ctx);
511 a record store inside a transaction
513 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
515 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
516 rec->private_data, struct db_ctdb_transaction_handle);
519 ret = db_ctdb_transaction_store(h, rec->key, data);
521 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
527 a record delete inside a transaction
529 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
531 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
532 rec->private_data, struct db_ctdb_transaction_handle);
535 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
537 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
546 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
549 struct ctdb_rec_data *rec = NULL;
552 talloc_free(h->m_write);
555 ret = db_ctdb_transaction_fetch_start(h);
560 for (i=0;i<h->m_all->count;i++) {
563 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
565 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
569 if (rec->reqid == 0) {
571 if (db_ctdb_transaction_store(h, key, data) != 0) {
576 TALLOC_CTX *tmp_ctx = talloc_new(h);
578 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
579 talloc_free(tmp_ctx);
582 if (data2.dsize != data.dsize ||
583 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
584 /* the record has changed on us - we have to give up */
585 talloc_free(tmp_ctx);
588 talloc_free(tmp_ctx);
595 tdb_transaction_cancel(h->ctx->wtdb->tdb);
603 static int db_ctdb_transaction_commit(struct db_context *db)
605 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
611 struct db_ctdb_transaction_handle *h = ctx->transaction;
612 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
615 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
619 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
621 talloc_set_destructor(h, NULL);
623 /* our commit strategy is quite complex.
625 - we first try to commit the changes to all other nodes
627 - if that works, then we commit locally and we are done
629 - if a commit on another node fails, then we need to cancel
630 the transaction, then restart the transaction (thus
631 opening a window of time for a pending recovery to
632 complete), then replay the transaction, checking all the
633 reads and writes (checking that reads give the same data,
634 and writes succeed). Then we retry the transaction to the
639 if (h->m_write == NULL) {
640 /* no changes were made, potentially after a retry */
641 tdb_transaction_cancel(h->ctx->wtdb->tdb);
643 ctx->transaction = NULL;
647 /* tell ctdbd to commit to the other nodes */
648 rets = ctdbd_control_local(messaging_ctdbd_connection(),
649 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
651 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
652 if (!NT_STATUS_IS_OK(rets) || status != 0) {
653 tdb_transaction_cancel(h->ctx->wtdb->tdb);
656 if (!NT_STATUS_IS_OK(rets)) {
657 failure_control = CTDB_CONTROL_TRANS2_ERROR;
659 /* work out what error code we will give if we
660 have to fail the operation */
661 switch ((enum ctdb_trans2_commit_error)status) {
662 case CTDB_TRANS2_COMMIT_SUCCESS:
663 case CTDB_TRANS2_COMMIT_SOMEFAIL:
664 case CTDB_TRANS2_COMMIT_TIMEOUT:
665 failure_control = CTDB_CONTROL_TRANS2_ERROR;
667 case CTDB_TRANS2_COMMIT_ALLFAIL:
668 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
673 if (++retries == 10) {
674 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
675 h->ctx->db_id, retries, (unsigned)failure_control));
676 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
677 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
678 tdb_null, NULL, NULL, NULL);
679 h->ctx->transaction = NULL;
681 ctx->transaction = NULL;
685 if (ctdb_replay_transaction(h) != 0) {
686 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
687 (unsigned)failure_control));
688 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
689 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
690 tdb_null, NULL, NULL, NULL);
691 h->ctx->transaction = NULL;
693 ctx->transaction = NULL;
698 failure_control = CTDB_CONTROL_TRANS2_ERROR;
701 /* do the real commit locally */
702 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
704 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
705 (unsigned)failure_control));
706 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
707 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
708 h->ctx->transaction = NULL;
713 /* tell ctdbd that we are finished with our local commit */
714 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
715 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
716 tdb_null, NULL, NULL, NULL);
717 h->ctx->transaction = NULL;
726 static int db_ctdb_transaction_cancel(struct db_context *db)
728 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
730 struct db_ctdb_transaction_handle *h = ctx->transaction;
733 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
737 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
739 ctx->transaction = NULL;
745 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
747 struct db_ctdb_rec *crec = talloc_get_type_abort(
748 rec->private_data, struct db_ctdb_rec);
752 cdata.dsize = sizeof(crec->header) + data.dsize;
754 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
755 return NT_STATUS_NO_MEMORY;
758 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
759 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
761 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
763 SAFE_FREE(cdata.dptr);
765 return (ret == 0) ? NT_STATUS_OK
766 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
771 static NTSTATUS db_ctdb_delete(struct db_record *rec)
776 * We have to store the header with empty data. TODO: Fix the
782 return db_ctdb_store(rec, data, 0);
786 static int db_ctdb_record_destr(struct db_record* data)
788 struct db_ctdb_rec *crec = talloc_get_type_abort(
789 data->private_data, struct db_ctdb_rec);
791 DEBUG(10, (DEBUGLEVEL > 10
792 ? "Unlocking db %u key %s\n"
793 : "Unlocking db %u key %.20s\n",
794 (int)crec->ctdb_ctx->db_id,
795 hex_encode(data, (unsigned char *)data->key.dptr,
798 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
799 DEBUG(0, ("tdb_chainunlock failed\n"));
806 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
811 struct db_record *result;
812 struct db_ctdb_rec *crec;
815 int migrate_attempts = 0;
817 if (!(result = talloc(mem_ctx, struct db_record))) {
818 DEBUG(0, ("talloc failed\n"));
822 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
823 DEBUG(0, ("talloc failed\n"));
828 result->private_data = (void *)crec;
829 crec->ctdb_ctx = ctx;
831 result->key.dsize = key.dsize;
832 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
833 if (result->key.dptr == NULL) {
834 DEBUG(0, ("talloc failed\n"));
840 * Do a blocking lock on the record
844 if (DEBUGLEVEL >= 10) {
845 char *keystr = hex_encode(result, key.dptr, key.dsize);
846 DEBUG(10, (DEBUGLEVEL > 10
847 ? "Locking db %u key %s\n"
848 : "Locking db %u key %.20s\n",
849 (int)crec->ctdb_ctx->db_id, keystr));
853 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
854 DEBUG(3, ("tdb_chainlock failed\n"));
859 result->store = db_ctdb_store;
860 result->delete_rec = db_ctdb_delete;
861 talloc_set_destructor(result, db_ctdb_record_destr);
863 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
866 * See if we have a valid record and we are the dmaster. If so, we can
867 * take the shortcut and just return it.
870 if ((ctdb_data.dptr == NULL) ||
871 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
872 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
874 || (random() % 2 != 0)
877 SAFE_FREE(ctdb_data.dptr);
878 tdb_chainunlock(ctx->wtdb->tdb, key);
879 talloc_set_destructor(result, NULL);
881 migrate_attempts += 1;
883 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
884 ctdb_data.dptr, ctdb_data.dptr ?
885 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
888 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
889 if (!NT_STATUS_IS_OK(status)) {
890 DEBUG(5, ("ctdb_migrate failed: %s\n",
895 /* now its migrated, try again */
899 if (migrate_attempts > 10) {
900 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
904 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
906 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
907 result->value.dptr = NULL;
909 if ((result->value.dsize != 0)
910 && !(result->value.dptr = (uint8 *)talloc_memdup(
911 result, ctdb_data.dptr + sizeof(crec->header),
912 result->value.dsize))) {
913 DEBUG(0, ("talloc failed\n"));
917 SAFE_FREE(ctdb_data.dptr);
922 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
926 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
929 if (ctx->transaction != NULL) {
930 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
933 if (db->persistent) {
934 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
937 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
941 fetch (unlocked, no migration) operation on ctdb
943 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
944 TDB_DATA key, TDB_DATA *data)
946 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
951 if (ctx->transaction) {
952 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
955 /* try a direct fetch */
956 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
959 * See if we have a valid record and we are the dmaster. If so, we can
960 * take the shortcut and just return it.
961 * we bypass the dmaster check for persistent databases
963 if ((ctdb_data.dptr != NULL) &&
964 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
966 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
967 /* we are the dmaster - avoid the ctdb protocol op */
969 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
970 if (data->dsize == 0) {
971 SAFE_FREE(ctdb_data.dptr);
976 data->dptr = (uint8 *)talloc_memdup(
977 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
980 SAFE_FREE(ctdb_data.dptr);
982 if (data->dptr == NULL) {
988 SAFE_FREE(ctdb_data.dptr);
990 /* we weren't able to get it locally - ask ctdb to fetch it for us */
991 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
992 if (!NT_STATUS_IS_OK(status)) {
993 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1000 struct traverse_state {
1001 struct db_context *db;
1002 int (*fn)(struct db_record *rec, void *private_data);
1006 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1008 struct traverse_state *state = (struct traverse_state *)private_data;
1009 struct db_record *rec;
1010 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1011 /* we have to give them a locked record to prevent races */
1012 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1013 if (rec && rec->value.dsize > 0) {
1014 state->fn(rec, state->private_data);
1016 talloc_free(tmp_ctx);
1019 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1022 struct traverse_state *state = (struct traverse_state *)private_data;
1023 struct db_record *rec;
1024 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1026 /* we have to give them a locked record to prevent races */
1027 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1028 if (rec && rec->value.dsize > 0) {
1029 ret = state->fn(rec, state->private_data);
1031 talloc_free(tmp_ctx);
1035 static int db_ctdb_traverse(struct db_context *db,
1036 int (*fn)(struct db_record *rec,
1037 void *private_data),
1040 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1041 struct db_ctdb_ctx);
1042 struct traverse_state state;
1046 state.private_data = private_data;
1048 if (db->persistent) {
1049 /* for persistent databases we don't need to do a ctdb traverse,
1050 we can do a faster local traverse */
1051 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1055 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1059 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1061 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1064 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1066 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1069 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1071 struct traverse_state *state = (struct traverse_state *)private_data;
1072 struct db_record rec;
1075 rec.store = db_ctdb_store_deny;
1076 rec.delete_rec = db_ctdb_delete_deny;
1077 rec.private_data = state->db;
1078 state->fn(&rec, state->private_data);
1081 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1084 struct traverse_state *state = (struct traverse_state *)private_data;
1085 struct db_record rec;
1088 rec.store = db_ctdb_store_deny;
1089 rec.delete_rec = db_ctdb_delete_deny;
1090 rec.private_data = state->db;
1092 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1093 /* a deleted record */
1096 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1097 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1099 return state->fn(&rec, state->private_data);
1102 static int db_ctdb_traverse_read(struct db_context *db,
1103 int (*fn)(struct db_record *rec,
1104 void *private_data),
1107 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1108 struct db_ctdb_ctx);
1109 struct traverse_state state;
1113 state.private_data = private_data;
1115 if (db->persistent) {
1116 /* for persistent databases we don't need to do a ctdb traverse,
1117 we can do a faster local traverse */
1118 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1121 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1125 static int db_ctdb_get_seqnum(struct db_context *db)
1127 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1128 struct db_ctdb_ctx);
1129 return tdb_get_seqnum(ctx->wtdb->tdb);
1132 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1134 int hash_size, int tdb_flags,
1135 int open_flags, mode_t mode)
1137 struct db_context *result;
1138 struct db_ctdb_ctx *db_ctdb;
1141 if (!lp_clustering()) {
1142 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1146 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1147 DEBUG(0, ("talloc failed\n"));
1148 TALLOC_FREE(result);
1152 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1153 DEBUG(0, ("talloc failed\n"));
1154 TALLOC_FREE(result);
1158 db_ctdb->transaction = NULL;
1159 db_ctdb->db = result;
1161 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1162 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1163 TALLOC_FREE(result);
1167 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1169 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1171 /* only pass through specific flags */
1172 tdb_flags &= TDB_SEQNUM;
1174 /* honor permissions if user has specified O_CREAT */
1175 if (open_flags & O_CREAT) {
1176 chmod(db_path, mode);
1179 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1180 if (db_ctdb->wtdb == NULL) {
1181 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1182 TALLOC_FREE(result);
1185 talloc_free(db_path);
1187 result->private_data = (void *)db_ctdb;
1188 result->fetch_locked = db_ctdb_fetch_locked;
1189 result->fetch = db_ctdb_fetch;
1190 result->traverse = db_ctdb_traverse;
1191 result->traverse_read = db_ctdb_traverse_read;
1192 result->get_seqnum = db_ctdb_get_seqnum;
1193 result->transaction_start = db_ctdb_transaction_start;
1194 result->transaction_commit = db_ctdb_transaction_commit;
1195 result->transaction_cancel = db_ctdb_transaction_cancel;
1197 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1198 name, db_ctdb->db_id));