2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
39 struct db_context *db;
40 struct tdb_wrap *wtdb;
42 struct db_ctdb_transaction_handle *transaction;
46 struct db_ctdb_ctx *ctdb_ctx;
47 struct ctdb_ltdb_header header;
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 enum TDB_ERROR tret = tdb_error(tdb);
62 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
85 struct ctdb_ltdb_header *header,
89 struct ctdb_rec_data *d;
91 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
92 data.dsize + (header?sizeof(*header):0);
93 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
99 d->keylen = key.dsize;
100 memcpy(&d->data[0], key.dptr, key.dsize);
102 d->datalen = data.dsize + sizeof(*header);
103 memcpy(&d->data[key.dsize], header, sizeof(*header));
104 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
106 d->datalen = data.dsize;
107 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
115 struct ctdb_marshall_buffer *m,
119 struct ctdb_ltdb_header *header,
122 struct ctdb_rec_data *r;
123 size_t m_size, r_size;
124 struct ctdb_marshall_buffer *m2;
126 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
133 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
141 m_size = talloc_get_size(m);
142 r_size = talloc_get_size(r);
144 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145 mem_ctx, m, m_size + r_size);
151 memcpy(m_size + (uint8_t *)m2, r, r_size);
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
164 data.dptr = (uint8_t *)m;
165 data.dsize = talloc_get_size(m);
170 loop over a marshalling buffer
172 - pass r==NULL to start
173 - loop the number of times indicated by m->count
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
177 struct ctdb_ltdb_header *header,
178 TDB_DATA *key, TDB_DATA *data)
181 r = (struct ctdb_rec_data *)&m->data[0];
183 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
191 key->dptr = &r->data[0];
192 key->dsize = r->keylen;
195 data->dptr = &r->data[r->keylen];
196 data->dsize = r->datalen;
197 if (header != NULL) {
198 data->dptr += sizeof(*header);
199 data->dsize -= sizeof(*header);
203 if (header != NULL) {
204 if (r->datalen < sizeof(*header)) {
207 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
218 tdb_transaction_cancel(h->ctx->wtdb->tdb);
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
225 struct db_record *rh;
228 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
230 struct db_ctdb_ctx *ctx = h->ctx;
233 key.dptr = (uint8_t *)discard_const(keyname);
234 key.dsize = strlen(keyname);
237 tmp_ctx = talloc_new(h);
239 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
241 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
242 talloc_free(tmp_ctx);
247 ret = tdb_transaction_start(ctx->wtdb->tdb);
249 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
250 talloc_free(tmp_ctx);
254 data = tdb_fetch(ctx->wtdb->tdb, key);
255 if ((data.dptr == NULL) ||
256 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
257 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
258 SAFE_FREE(data.dptr);
259 tdb_transaction_cancel(ctx->wtdb->tdb);
260 talloc_free(tmp_ctx);
264 SAFE_FREE(data.dptr);
265 talloc_free(tmp_ctx);
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context *db)
274 struct db_ctdb_transaction_handle *h;
276 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
279 if (!db->persistent) {
280 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
285 if (ctx->transaction) {
286 ctx->transaction->nesting++;
290 h = talloc_zero(db, struct db_ctdb_transaction_handle);
292 DEBUG(0,(__location__ " oom for transaction handle\n"));
298 ret = db_ctdb_transaction_fetch_start(h);
304 talloc_set_destructor(h, db_ctdb_transaction_destructor);
306 ctx->transaction = h;
308 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
316 fetch a record inside a transaction
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
320 TDB_DATA key, TDB_DATA *data)
322 struct db_ctdb_transaction_handle *h = db->transaction;
324 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
326 if (data->dptr != NULL) {
327 uint8_t *oldptr = (uint8_t *)data->dptr;
328 data->dsize -= sizeof(struct ctdb_ltdb_header);
329 if (data->dsize == 0) {
332 data->dptr = (uint8 *)
334 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
338 if (data->dptr == NULL && data->dsize != 0) {
344 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
345 if (h->m_all == NULL) {
346 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
348 talloc_free(data->dptr);
357 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
358 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
360 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
364 struct db_record *result;
367 if (!(result = talloc(mem_ctx, struct db_record))) {
368 DEBUG(0, ("talloc failed\n"));
372 result->private_data = ctx->transaction;
374 result->key.dsize = key.dsize;
375 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
376 if (result->key.dptr == NULL) {
377 DEBUG(0, ("talloc failed\n"));
382 result->store = db_ctdb_store_transaction;
383 result->delete_rec = db_ctdb_delete_transaction;
385 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
386 if (ctdb_data.dptr == NULL) {
387 /* create the record */
388 result->value = tdb_null;
392 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
393 result->value.dptr = NULL;
395 if ((result->value.dsize != 0)
396 && !(result->value.dptr = (uint8 *)talloc_memdup(
397 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
398 result->value.dsize))) {
399 DEBUG(0, ("talloc failed\n"));
403 SAFE_FREE(ctdb_data.dptr);
408 static int db_ctdb_record_destructor(struct db_record **recp)
410 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
411 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
412 rec->private_data, struct db_ctdb_transaction_handle);
413 int ret = h->ctx->db->transaction_commit(h->ctx->db);
415 DEBUG(0,(__location__ " transaction_commit failed\n"));
421 auto-create a transaction for persistent databases
423 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
428 struct db_record *rec, **recp;
430 res = db_ctdb_transaction_start(ctx->db);
435 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
437 ctx->db->transaction_cancel(ctx->db);
441 /* destroy this transaction when we release the lock */
442 recp = talloc(rec, struct db_record *);
444 ctx->db->transaction_cancel(ctx->db);
448 talloc_set_destructor(recp, db_ctdb_record_destructor);
454 stores a record inside a transaction
456 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
457 TDB_DATA key, TDB_DATA data)
459 TALLOC_CTX *tmp_ctx = talloc_new(h);
462 struct ctdb_ltdb_header header;
464 /* we need the header so we can update the RSN */
465 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
466 if (rec.dptr == NULL) {
467 /* the record doesn't exist - create one with us as dmaster.
468 This is only safe because we are in a transaction and this
469 is a persistent database */
471 header.dmaster = get_my_vnn();
473 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
474 rec.dsize -= sizeof(struct ctdb_ltdb_header);
475 /* a special case, we are writing the same data that is there now */
476 if (data.dsize == rec.dsize &&
477 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
479 talloc_free(tmp_ctx);
488 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
489 if (h->m_all == NULL) {
490 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
491 talloc_free(tmp_ctx);
496 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
497 if (h->m_write == NULL) {
498 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
499 talloc_free(tmp_ctx);
503 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
504 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
505 if (rec.dptr == NULL) {
506 DEBUG(0,(__location__ " Failed to alloc record\n"));
507 talloc_free(tmp_ctx);
510 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
511 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
513 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
515 talloc_free(tmp_ctx);
522 a record store inside a transaction
524 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
526 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
527 rec->private_data, struct db_ctdb_transaction_handle);
530 ret = db_ctdb_transaction_store(h, rec->key, data);
532 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
538 a record delete inside a transaction
540 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
542 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
543 rec->private_data, struct db_ctdb_transaction_handle);
546 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
548 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
557 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
560 struct ctdb_rec_data *rec = NULL;
563 talloc_free(h->m_write);
566 ret = db_ctdb_transaction_fetch_start(h);
571 for (i=0;i<h->m_all->count;i++) {
574 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
576 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
580 if (rec->reqid == 0) {
582 if (db_ctdb_transaction_store(h, key, data) != 0) {
587 TALLOC_CTX *tmp_ctx = talloc_new(h);
589 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
590 talloc_free(tmp_ctx);
593 if (data2.dsize != data.dsize ||
594 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
595 /* the record has changed on us - we have to give up */
596 talloc_free(tmp_ctx);
599 talloc_free(tmp_ctx);
606 tdb_transaction_cancel(h->ctx->wtdb->tdb);
614 static int db_ctdb_transaction_commit(struct db_context *db)
616 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
622 struct db_ctdb_transaction_handle *h = ctx->transaction;
623 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
626 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
630 if (h->nested_cancel) {
631 db->transaction_cancel(db);
632 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
636 if (h->nesting != 0) {
641 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
643 talloc_set_destructor(h, NULL);
645 /* our commit strategy is quite complex.
647 - we first try to commit the changes to all other nodes
649 - if that works, then we commit locally and we are done
651 - if a commit on another node fails, then we need to cancel
652 the transaction, then restart the transaction (thus
653 opening a window of time for a pending recovery to
654 complete), then replay the transaction, checking all the
655 reads and writes (checking that reads give the same data,
656 and writes succeed). Then we retry the transaction to the
661 if (h->m_write == NULL) {
662 /* no changes were made, potentially after a retry */
663 tdb_transaction_cancel(h->ctx->wtdb->tdb);
665 ctx->transaction = NULL;
669 /* tell ctdbd to commit to the other nodes */
670 rets = ctdbd_control_local(messaging_ctdbd_connection(),
671 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
673 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
674 if (!NT_STATUS_IS_OK(rets) || status != 0) {
675 tdb_transaction_cancel(h->ctx->wtdb->tdb);
678 if (!NT_STATUS_IS_OK(rets)) {
679 failure_control = CTDB_CONTROL_TRANS2_ERROR;
681 /* work out what error code we will give if we
682 have to fail the operation */
683 switch ((enum ctdb_trans2_commit_error)status) {
684 case CTDB_TRANS2_COMMIT_SUCCESS:
685 case CTDB_TRANS2_COMMIT_SOMEFAIL:
686 case CTDB_TRANS2_COMMIT_TIMEOUT:
687 failure_control = CTDB_CONTROL_TRANS2_ERROR;
689 case CTDB_TRANS2_COMMIT_ALLFAIL:
690 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
695 if (++retries == 5) {
696 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
697 h->ctx->db_id, retries, (unsigned)failure_control));
698 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
699 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
700 tdb_null, NULL, NULL, NULL);
701 h->ctx->transaction = NULL;
703 ctx->transaction = NULL;
707 if (ctdb_replay_transaction(h) != 0) {
708 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
709 (unsigned)failure_control));
710 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
711 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
712 tdb_null, NULL, NULL, NULL);
713 h->ctx->transaction = NULL;
715 ctx->transaction = NULL;
720 failure_control = CTDB_CONTROL_TRANS2_ERROR;
723 /* do the real commit locally */
724 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
726 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
727 (unsigned)failure_control));
728 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
729 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
730 h->ctx->transaction = NULL;
735 /* tell ctdbd that we are finished with our local commit */
736 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
737 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
738 tdb_null, NULL, NULL, NULL);
739 h->ctx->transaction = NULL;
748 static int db_ctdb_transaction_cancel(struct db_context *db)
750 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
752 struct db_ctdb_transaction_handle *h = ctx->transaction;
755 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
759 if (h->nesting != 0) {
761 h->nested_cancel = true;
765 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
767 ctx->transaction = NULL;
773 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
775 struct db_ctdb_rec *crec = talloc_get_type_abort(
776 rec->private_data, struct db_ctdb_rec);
780 cdata.dsize = sizeof(crec->header) + data.dsize;
782 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
783 return NT_STATUS_NO_MEMORY;
786 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
787 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
789 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
791 SAFE_FREE(cdata.dptr);
793 return (ret == 0) ? NT_STATUS_OK
794 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
799 static NTSTATUS db_ctdb_delete(struct db_record *rec)
804 * We have to store the header with empty data. TODO: Fix the
810 return db_ctdb_store(rec, data, 0);
814 static int db_ctdb_record_destr(struct db_record* data)
816 struct db_ctdb_rec *crec = talloc_get_type_abort(
817 data->private_data, struct db_ctdb_rec);
819 DEBUG(10, (DEBUGLEVEL > 10
820 ? "Unlocking db %u key %s\n"
821 : "Unlocking db %u key %.20s\n",
822 (int)crec->ctdb_ctx->db_id,
823 hex_encode(data, (unsigned char *)data->key.dptr,
826 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
827 DEBUG(0, ("tdb_chainunlock failed\n"));
834 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
839 struct db_record *result;
840 struct db_ctdb_rec *crec;
843 int migrate_attempts = 0;
845 if (!(result = talloc(mem_ctx, struct db_record))) {
846 DEBUG(0, ("talloc failed\n"));
850 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
851 DEBUG(0, ("talloc failed\n"));
856 result->private_data = (void *)crec;
857 crec->ctdb_ctx = ctx;
859 result->key.dsize = key.dsize;
860 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
861 if (result->key.dptr == NULL) {
862 DEBUG(0, ("talloc failed\n"));
868 * Do a blocking lock on the record
872 if (DEBUGLEVEL >= 10) {
873 char *keystr = hex_encode(result, key.dptr, key.dsize);
874 DEBUG(10, (DEBUGLEVEL > 10
875 ? "Locking db %u key %s\n"
876 : "Locking db %u key %.20s\n",
877 (int)crec->ctdb_ctx->db_id, keystr));
881 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
882 DEBUG(3, ("tdb_chainlock failed\n"));
887 result->store = db_ctdb_store;
888 result->delete_rec = db_ctdb_delete;
889 talloc_set_destructor(result, db_ctdb_record_destr);
891 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
894 * See if we have a valid record and we are the dmaster. If so, we can
895 * take the shortcut and just return it.
898 if ((ctdb_data.dptr == NULL) ||
899 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
900 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
902 || (random() % 2 != 0)
905 SAFE_FREE(ctdb_data.dptr);
906 tdb_chainunlock(ctx->wtdb->tdb, key);
907 talloc_set_destructor(result, NULL);
909 migrate_attempts += 1;
911 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
912 ctdb_data.dptr, ctdb_data.dptr ?
913 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
916 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
917 if (!NT_STATUS_IS_OK(status)) {
918 DEBUG(5, ("ctdb_migrate failed: %s\n",
923 /* now its migrated, try again */
927 if (migrate_attempts > 10) {
928 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
932 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
934 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
935 result->value.dptr = NULL;
937 if ((result->value.dsize != 0)
938 && !(result->value.dptr = (uint8 *)talloc_memdup(
939 result, ctdb_data.dptr + sizeof(crec->header),
940 result->value.dsize))) {
941 DEBUG(0, ("talloc failed\n"));
945 SAFE_FREE(ctdb_data.dptr);
950 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
954 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
957 if (ctx->transaction != NULL) {
958 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
961 if (db->persistent) {
962 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
965 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
969 fetch (unlocked, no migration) operation on ctdb
971 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
972 TDB_DATA key, TDB_DATA *data)
974 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
979 if (ctx->transaction) {
980 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
983 /* try a direct fetch */
984 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
987 * See if we have a valid record and we are the dmaster. If so, we can
988 * take the shortcut and just return it.
989 * we bypass the dmaster check for persistent databases
991 if ((ctdb_data.dptr != NULL) &&
992 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
994 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
995 /* we are the dmaster - avoid the ctdb protocol op */
997 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
998 if (data->dsize == 0) {
999 SAFE_FREE(ctdb_data.dptr);
1004 data->dptr = (uint8 *)talloc_memdup(
1005 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1008 SAFE_FREE(ctdb_data.dptr);
1010 if (data->dptr == NULL) {
1016 SAFE_FREE(ctdb_data.dptr);
1018 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1019 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1020 if (!NT_STATUS_IS_OK(status)) {
1021 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1028 struct traverse_state {
1029 struct db_context *db;
1030 int (*fn)(struct db_record *rec, void *private_data);
1034 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1036 struct traverse_state *state = (struct traverse_state *)private_data;
1037 struct db_record *rec;
1038 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1039 /* we have to give them a locked record to prevent races */
1040 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1041 if (rec && rec->value.dsize > 0) {
1042 state->fn(rec, state->private_data);
1044 talloc_free(tmp_ctx);
1047 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1050 struct traverse_state *state = (struct traverse_state *)private_data;
1051 struct db_record *rec;
1052 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1054 /* we have to give them a locked record to prevent races */
1055 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1056 if (rec && rec->value.dsize > 0) {
1057 ret = state->fn(rec, state->private_data);
1059 talloc_free(tmp_ctx);
1063 static int db_ctdb_traverse(struct db_context *db,
1064 int (*fn)(struct db_record *rec,
1065 void *private_data),
1068 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1069 struct db_ctdb_ctx);
1070 struct traverse_state state;
1074 state.private_data = private_data;
1076 if (db->persistent) {
1077 /* for persistent databases we don't need to do a ctdb traverse,
1078 we can do a faster local traverse */
1079 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1083 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1087 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1089 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1092 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1094 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1097 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1099 struct traverse_state *state = (struct traverse_state *)private_data;
1100 struct db_record rec;
1103 rec.store = db_ctdb_store_deny;
1104 rec.delete_rec = db_ctdb_delete_deny;
1105 rec.private_data = state->db;
1106 state->fn(&rec, state->private_data);
1109 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1112 struct traverse_state *state = (struct traverse_state *)private_data;
1113 struct db_record rec;
1116 rec.store = db_ctdb_store_deny;
1117 rec.delete_rec = db_ctdb_delete_deny;
1118 rec.private_data = state->db;
1120 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1121 /* a deleted record */
1124 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1125 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1127 return state->fn(&rec, state->private_data);
1130 static int db_ctdb_traverse_read(struct db_context *db,
1131 int (*fn)(struct db_record *rec,
1132 void *private_data),
1135 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1136 struct db_ctdb_ctx);
1137 struct traverse_state state;
1141 state.private_data = private_data;
1143 if (db->persistent) {
1144 /* for persistent databases we don't need to do a ctdb traverse,
1145 we can do a faster local traverse */
1146 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1149 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1153 static int db_ctdb_get_seqnum(struct db_context *db)
1155 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1156 struct db_ctdb_ctx);
1157 return tdb_get_seqnum(ctx->wtdb->tdb);
1160 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1162 int hash_size, int tdb_flags,
1163 int open_flags, mode_t mode)
1165 struct db_context *result;
1166 struct db_ctdb_ctx *db_ctdb;
1169 if (!lp_clustering()) {
1170 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1174 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1175 DEBUG(0, ("talloc failed\n"));
1176 TALLOC_FREE(result);
1180 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1181 DEBUG(0, ("talloc failed\n"));
1182 TALLOC_FREE(result);
1186 db_ctdb->transaction = NULL;
1187 db_ctdb->db = result;
1189 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1190 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1191 TALLOC_FREE(result);
1195 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1197 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1199 /* only pass through specific flags */
1200 tdb_flags &= TDB_SEQNUM;
1202 /* honor permissions if user has specified O_CREAT */
1203 if (open_flags & O_CREAT) {
1204 chmod(db_path, mode);
1207 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1208 if (db_ctdb->wtdb == NULL) {
1209 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1210 TALLOC_FREE(result);
1213 talloc_free(db_path);
1215 result->private_data = (void *)db_ctdb;
1216 result->fetch_locked = db_ctdb_fetch_locked;
1217 result->fetch = db_ctdb_fetch;
1218 result->traverse = db_ctdb_traverse;
1219 result->traverse_read = db_ctdb_traverse_read;
1220 result->get_seqnum = db_ctdb_get_seqnum;
1221 result->transaction_start = db_ctdb_transaction_start;
1222 result->transaction_commit = db_ctdb_transaction_commit;
1223 result->transaction_cancel = db_ctdb_transaction_cancel;
1225 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1226 name, db_ctdb->db_id));