2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
44 struct db_ctdb_transaction_handle *transaction;
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
60 enum TDB_ERROR tret = tdb_error(tdb);
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
79 * Store a record together with the ctdb record header
80 * in the local copy of the database.
82 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
84 struct ctdb_ltdb_header *header,
87 TALLOC_CTX *tmp_ctx = talloc_stackframe();
91 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
92 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
94 if (rec.dptr == NULL) {
96 return NT_STATUS_NO_MEMORY;
99 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
100 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
102 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
104 talloc_free(tmp_ctx);
106 return (ret == 0) ? NT_STATUS_OK
107 : tdb_error_to_ntstatus(db->wtdb->tdb);
112 form a ctdb_rec_data record from a key/data pair
114 note that header may be NULL. If not NULL then it is included in the data portion
117 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
119 struct ctdb_ltdb_header *header,
123 struct ctdb_rec_data *d;
125 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
126 data.dsize + (header?sizeof(*header):0);
127 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
133 d->keylen = key.dsize;
134 memcpy(&d->data[0], key.dptr, key.dsize);
136 d->datalen = data.dsize + sizeof(*header);
137 memcpy(&d->data[key.dsize], header, sizeof(*header));
138 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
140 d->datalen = data.dsize;
141 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
147 /* helper function for marshalling multiple records */
148 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
149 struct ctdb_marshall_buffer *m,
153 struct ctdb_ltdb_header *header,
156 struct ctdb_rec_data *r;
157 size_t m_size, r_size;
158 struct ctdb_marshall_buffer *m2 = NULL;
160 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
167 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
168 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
175 m_size = talloc_get_size(m);
176 r_size = talloc_get_size(r);
178 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
179 mem_ctx, m, m_size + r_size);
185 memcpy(m_size + (uint8_t *)m2, r, r_size);
194 /* we've finished marshalling, return a data blob with the marshalled records */
195 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
198 data.dptr = (uint8_t *)m;
199 data.dsize = talloc_get_size(m);
204 loop over a marshalling buffer
206 - pass r==NULL to start
207 - loop the number of times indicated by m->count
209 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
211 struct ctdb_ltdb_header *header,
212 TDB_DATA *key, TDB_DATA *data)
215 r = (struct ctdb_rec_data *)&m->data[0];
217 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
225 key->dptr = &r->data[0];
226 key->dsize = r->keylen;
229 data->dptr = &r->data[r->keylen];
230 data->dsize = r->datalen;
231 if (header != NULL) {
232 data->dptr += sizeof(*header);
233 data->dsize -= sizeof(*header);
237 if (header != NULL) {
238 if (r->datalen < sizeof(*header)) {
241 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
250 * CTDB transaction destructor
252 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
254 tdb_transaction_cancel(h->ctx->wtdb->tdb);
259 * start a transaction on a ctdb database:
260 * - lock the transaction lock key
261 * - start the tdb transaction
263 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
265 struct db_record *rh;
268 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
270 struct db_ctdb_ctx *ctx = h->ctx;
273 key.dptr = (uint8_t *)discard_const(keyname);
274 key.dsize = strlen(keyname);
277 tmp_ctx = talloc_new(h);
279 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
281 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
282 talloc_free(tmp_ctx);
287 ret = tdb_transaction_start(ctx->wtdb->tdb);
289 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
290 talloc_free(tmp_ctx);
294 data = tdb_fetch(ctx->wtdb->tdb, key);
295 if ((data.dptr == NULL) ||
296 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
297 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
298 SAFE_FREE(data.dptr);
299 tdb_transaction_cancel(ctx->wtdb->tdb);
300 talloc_free(tmp_ctx);
304 SAFE_FREE(data.dptr);
305 talloc_free(tmp_ctx);
312 * CTDB dbwrap API: transaction_start function
313 * starts a transaction on a persistent database
315 static int db_ctdb_transaction_start(struct db_context *db)
317 struct db_ctdb_transaction_handle *h;
319 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
322 if (!db->persistent) {
323 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
328 if (ctx->transaction) {
329 ctx->transaction->nesting++;
333 h = talloc_zero(db, struct db_ctdb_transaction_handle);
335 DEBUG(0,(__location__ " oom for transaction handle\n"));
341 ret = db_ctdb_transaction_fetch_start(h);
347 talloc_set_destructor(h, db_ctdb_transaction_destructor);
349 ctx->transaction = h;
351 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
359 fetch a record inside a transaction
361 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
363 TDB_DATA key, TDB_DATA *data)
365 struct db_ctdb_transaction_handle *h = db->transaction;
367 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
369 if (data->dptr != NULL) {
370 uint8_t *oldptr = (uint8_t *)data->dptr;
371 data->dsize -= sizeof(struct ctdb_ltdb_header);
372 if (data->dsize == 0) {
375 data->dptr = (uint8 *)
377 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
381 if (data->dptr == NULL && data->dsize != 0) {
387 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
388 if (h->m_all == NULL) {
389 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
391 talloc_free(data->dptr);
400 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
401 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
403 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
407 struct db_record *result;
410 if (!(result = talloc(mem_ctx, struct db_record))) {
411 DEBUG(0, ("talloc failed\n"));
415 result->private_data = ctx->transaction;
417 result->key.dsize = key.dsize;
418 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
419 if (result->key.dptr == NULL) {
420 DEBUG(0, ("talloc failed\n"));
425 result->store = db_ctdb_store_transaction;
426 result->delete_rec = db_ctdb_delete_transaction;
428 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
429 if (ctdb_data.dptr == NULL) {
430 /* create the record */
431 result->value = tdb_null;
435 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
436 result->value.dptr = NULL;
438 if ((result->value.dsize != 0)
439 && !(result->value.dptr = (uint8 *)talloc_memdup(
440 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
441 result->value.dsize))) {
442 DEBUG(0, ("talloc failed\n"));
446 SAFE_FREE(ctdb_data.dptr);
451 static int db_ctdb_record_destructor(struct db_record **recp)
453 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
454 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
455 rec->private_data, struct db_ctdb_transaction_handle);
456 int ret = h->ctx->db->transaction_commit(h->ctx->db);
458 DEBUG(0,(__location__ " transaction_commit failed\n"));
464 auto-create a transaction for persistent databases
466 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
471 struct db_record *rec, **recp;
473 res = db_ctdb_transaction_start(ctx->db);
478 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
480 ctx->db->transaction_cancel(ctx->db);
484 /* destroy this transaction when we release the lock */
485 recp = talloc(rec, struct db_record *);
487 ctx->db->transaction_cancel(ctx->db);
492 talloc_set_destructor(recp, db_ctdb_record_destructor);
498 stores a record inside a transaction
500 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
501 TDB_DATA key, TDB_DATA data)
503 TALLOC_CTX *tmp_ctx = talloc_new(h);
506 struct ctdb_ltdb_header header;
509 /* we need the header so we can update the RSN */
510 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
511 if (rec.dptr == NULL) {
512 /* the record doesn't exist - create one with us as dmaster.
513 This is only safe because we are in a transaction and this
514 is a persistent database */
517 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
518 rec.dsize -= sizeof(struct ctdb_ltdb_header);
519 /* a special case, we are writing the same data that is there now */
520 if (data.dsize == rec.dsize &&
521 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
523 talloc_free(tmp_ctx);
529 header.dmaster = get_my_vnn();
533 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
534 if (h->m_all == NULL) {
535 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
536 talloc_free(tmp_ctx);
541 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
542 if (h->m_write == NULL) {
543 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
544 talloc_free(tmp_ctx);
548 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
549 if (NT_STATUS_IS_OK(status)) {
555 talloc_free(tmp_ctx);
562 a record store inside a transaction
564 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
566 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
567 rec->private_data, struct db_ctdb_transaction_handle);
570 ret = db_ctdb_transaction_store(h, rec->key, data);
572 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
578 a record delete inside a transaction
580 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
582 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
583 rec->private_data, struct db_ctdb_transaction_handle);
586 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
588 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
597 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
600 struct ctdb_rec_data *rec = NULL;
603 talloc_free(h->m_write);
606 ret = db_ctdb_transaction_fetch_start(h);
611 for (i=0;i<h->m_all->count;i++) {
614 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
616 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
620 if (rec->reqid == 0) {
622 if (db_ctdb_transaction_store(h, key, data) != 0) {
627 TALLOC_CTX *tmp_ctx = talloc_new(h);
629 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
630 talloc_free(tmp_ctx);
633 if (data2.dsize != data.dsize ||
634 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
635 /* the record has changed on us - we have to give up */
636 talloc_free(tmp_ctx);
639 talloc_free(tmp_ctx);
646 tdb_transaction_cancel(h->ctx->wtdb->tdb);
654 static int db_ctdb_transaction_commit(struct db_context *db)
656 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
662 struct db_ctdb_transaction_handle *h = ctx->transaction;
663 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
666 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
670 if (h->nested_cancel) {
671 db->transaction_cancel(db);
672 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
676 if (h->nesting != 0) {
681 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
683 talloc_set_destructor(h, NULL);
685 /* our commit strategy is quite complex.
687 - we first try to commit the changes to all other nodes
689 - if that works, then we commit locally and we are done
691 - if a commit on another node fails, then we need to cancel
692 the transaction, then restart the transaction (thus
693 opening a window of time for a pending recovery to
694 complete), then replay the transaction, checking all the
695 reads and writes (checking that reads give the same data,
696 and writes succeed). Then we retry the transaction to the
701 if (h->m_write == NULL) {
702 /* no changes were made, potentially after a retry */
703 tdb_transaction_cancel(h->ctx->wtdb->tdb);
705 ctx->transaction = NULL;
709 /* tell ctdbd to commit to the other nodes */
710 rets = ctdbd_control_local(messaging_ctdbd_connection(),
711 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
713 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
714 if (!NT_STATUS_IS_OK(rets) || status != 0) {
715 tdb_transaction_cancel(h->ctx->wtdb->tdb);
718 if (!NT_STATUS_IS_OK(rets)) {
719 failure_control = CTDB_CONTROL_TRANS2_ERROR;
721 /* work out what error code we will give if we
722 have to fail the operation */
723 switch ((enum ctdb_trans2_commit_error)status) {
724 case CTDB_TRANS2_COMMIT_SUCCESS:
725 case CTDB_TRANS2_COMMIT_SOMEFAIL:
726 case CTDB_TRANS2_COMMIT_TIMEOUT:
727 failure_control = CTDB_CONTROL_TRANS2_ERROR;
729 case CTDB_TRANS2_COMMIT_ALLFAIL:
730 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
735 if (++retries == 5) {
736 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
737 h->ctx->db_id, retries, (unsigned)failure_control));
738 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
739 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
740 tdb_null, NULL, NULL, NULL);
741 h->ctx->transaction = NULL;
743 ctx->transaction = NULL;
747 if (ctdb_replay_transaction(h) != 0) {
748 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
749 (unsigned)failure_control));
750 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
751 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
752 tdb_null, NULL, NULL, NULL);
753 h->ctx->transaction = NULL;
755 ctx->transaction = NULL;
760 failure_control = CTDB_CONTROL_TRANS2_ERROR;
763 /* do the real commit locally */
764 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
766 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
767 (unsigned)failure_control));
768 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
769 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
770 h->ctx->transaction = NULL;
775 /* tell ctdbd that we are finished with our local commit */
776 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
777 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
778 tdb_null, NULL, NULL, NULL);
779 h->ctx->transaction = NULL;
788 static int db_ctdb_transaction_cancel(struct db_context *db)
790 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
792 struct db_ctdb_transaction_handle *h = ctx->transaction;
795 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
799 if (h->nesting != 0) {
801 h->nested_cancel = true;
805 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
807 ctx->transaction = NULL;
813 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
815 struct db_ctdb_rec *crec = talloc_get_type_abort(
816 rec->private_data, struct db_ctdb_rec);
818 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
823 static NTSTATUS db_ctdb_delete(struct db_record *rec)
828 * We have to store the header with empty data. TODO: Fix the
834 return db_ctdb_store(rec, data, 0);
838 static int db_ctdb_record_destr(struct db_record* data)
840 struct db_ctdb_rec *crec = talloc_get_type_abort(
841 data->private_data, struct db_ctdb_rec);
843 DEBUG(10, (DEBUGLEVEL > 10
844 ? "Unlocking db %u key %s\n"
845 : "Unlocking db %u key %.20s\n",
846 (int)crec->ctdb_ctx->db_id,
847 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
850 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
851 DEBUG(0, ("tdb_chainunlock failed\n"));
858 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
863 struct db_record *result;
864 struct db_ctdb_rec *crec;
867 int migrate_attempts = 0;
869 if (!(result = talloc(mem_ctx, struct db_record))) {
870 DEBUG(0, ("talloc failed\n"));
874 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
875 DEBUG(0, ("talloc failed\n"));
880 result->private_data = (void *)crec;
881 crec->ctdb_ctx = ctx;
883 result->key.dsize = key.dsize;
884 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
885 if (result->key.dptr == NULL) {
886 DEBUG(0, ("talloc failed\n"));
892 * Do a blocking lock on the record
896 if (DEBUGLEVEL >= 10) {
897 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
898 DEBUG(10, (DEBUGLEVEL > 10
899 ? "Locking db %u key %s\n"
900 : "Locking db %u key %.20s\n",
901 (int)crec->ctdb_ctx->db_id, keystr));
905 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
906 DEBUG(3, ("tdb_chainlock failed\n"));
911 result->store = db_ctdb_store;
912 result->delete_rec = db_ctdb_delete;
913 talloc_set_destructor(result, db_ctdb_record_destr);
915 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
918 * See if we have a valid record and we are the dmaster. If so, we can
919 * take the shortcut and just return it.
922 if ((ctdb_data.dptr == NULL) ||
923 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
924 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
926 || (random() % 2 != 0)
929 SAFE_FREE(ctdb_data.dptr);
930 tdb_chainunlock(ctx->wtdb->tdb, key);
931 talloc_set_destructor(result, NULL);
933 migrate_attempts += 1;
935 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
936 ctdb_data.dptr, ctdb_data.dptr ?
937 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
940 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
941 if (!NT_STATUS_IS_OK(status)) {
942 DEBUG(5, ("ctdb_migrate failed: %s\n",
947 /* now its migrated, try again */
951 if (migrate_attempts > 10) {
952 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
956 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
958 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
959 result->value.dptr = NULL;
961 if ((result->value.dsize != 0)
962 && !(result->value.dptr = (uint8 *)talloc_memdup(
963 result, ctdb_data.dptr + sizeof(crec->header),
964 result->value.dsize))) {
965 DEBUG(0, ("talloc failed\n"));
969 SAFE_FREE(ctdb_data.dptr);
974 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
978 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
981 if (ctx->transaction != NULL) {
982 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
985 if (db->persistent) {
986 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
989 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
993 fetch (unlocked, no migration) operation on ctdb
995 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
996 TDB_DATA key, TDB_DATA *data)
998 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1003 if (ctx->transaction) {
1004 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1007 /* try a direct fetch */
1008 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1011 * See if we have a valid record and we are the dmaster. If so, we can
1012 * take the shortcut and just return it.
1013 * we bypass the dmaster check for persistent databases
1015 if ((ctdb_data.dptr != NULL) &&
1016 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1018 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1019 /* we are the dmaster - avoid the ctdb protocol op */
1021 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1022 if (data->dsize == 0) {
1023 SAFE_FREE(ctdb_data.dptr);
1028 data->dptr = (uint8 *)talloc_memdup(
1029 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1032 SAFE_FREE(ctdb_data.dptr);
1034 if (data->dptr == NULL) {
1040 SAFE_FREE(ctdb_data.dptr);
1042 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1043 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1044 if (!NT_STATUS_IS_OK(status)) {
1045 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1052 struct traverse_state {
1053 struct db_context *db;
1054 int (*fn)(struct db_record *rec, void *private_data);
1058 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1060 struct traverse_state *state = (struct traverse_state *)private_data;
1061 struct db_record *rec;
1062 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1063 /* we have to give them a locked record to prevent races */
1064 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1065 if (rec && rec->value.dsize > 0) {
1066 state->fn(rec, state->private_data);
1068 talloc_free(tmp_ctx);
1071 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1074 struct traverse_state *state = (struct traverse_state *)private_data;
1075 struct db_record *rec;
1076 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1078 /* we have to give them a locked record to prevent races */
1079 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1080 if (rec && rec->value.dsize > 0) {
1081 ret = state->fn(rec, state->private_data);
1083 talloc_free(tmp_ctx);
1087 static int db_ctdb_traverse(struct db_context *db,
1088 int (*fn)(struct db_record *rec,
1089 void *private_data),
1092 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1093 struct db_ctdb_ctx);
1094 struct traverse_state state;
1098 state.private_data = private_data;
1100 if (db->persistent) {
1101 /* for persistent databases we don't need to do a ctdb traverse,
1102 we can do a faster local traverse */
1103 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1107 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1111 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1113 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1116 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1118 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1121 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1123 struct traverse_state *state = (struct traverse_state *)private_data;
1124 struct db_record rec;
1127 rec.store = db_ctdb_store_deny;
1128 rec.delete_rec = db_ctdb_delete_deny;
1129 rec.private_data = state->db;
1130 state->fn(&rec, state->private_data);
1133 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1136 struct traverse_state *state = (struct traverse_state *)private_data;
1137 struct db_record rec;
1140 rec.store = db_ctdb_store_deny;
1141 rec.delete_rec = db_ctdb_delete_deny;
1142 rec.private_data = state->db;
1144 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1145 /* a deleted record */
1148 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1149 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1151 return state->fn(&rec, state->private_data);
1154 static int db_ctdb_traverse_read(struct db_context *db,
1155 int (*fn)(struct db_record *rec,
1156 void *private_data),
1159 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1160 struct db_ctdb_ctx);
1161 struct traverse_state state;
1165 state.private_data = private_data;
1167 if (db->persistent) {
1168 /* for persistent databases we don't need to do a ctdb traverse,
1169 we can do a faster local traverse */
1170 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1173 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1177 static int db_ctdb_get_seqnum(struct db_context *db)
1179 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1180 struct db_ctdb_ctx);
1181 return tdb_get_seqnum(ctx->wtdb->tdb);
1184 static int db_ctdb_get_flags(struct db_context *db)
1186 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1187 struct db_ctdb_ctx);
1188 return tdb_get_flags(ctx->wtdb->tdb);
1191 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1193 int hash_size, int tdb_flags,
1194 int open_flags, mode_t mode)
1196 struct db_context *result;
1197 struct db_ctdb_ctx *db_ctdb;
1200 if (!lp_clustering()) {
1201 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1205 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1206 DEBUG(0, ("talloc failed\n"));
1207 TALLOC_FREE(result);
1211 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1212 DEBUG(0, ("talloc failed\n"));
1213 TALLOC_FREE(result);
1217 db_ctdb->transaction = NULL;
1218 db_ctdb->db = result;
1220 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1221 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1222 TALLOC_FREE(result);
1226 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1228 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1230 /* only pass through specific flags */
1231 tdb_flags &= TDB_SEQNUM;
1233 /* honor permissions if user has specified O_CREAT */
1234 if (open_flags & O_CREAT) {
1235 chmod(db_path, mode);
1238 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1239 if (db_ctdb->wtdb == NULL) {
1240 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1241 TALLOC_FREE(result);
1244 talloc_free(db_path);
1246 result->private_data = (void *)db_ctdb;
1247 result->fetch_locked = db_ctdb_fetch_locked;
1248 result->fetch = db_ctdb_fetch;
1249 result->traverse = db_ctdb_traverse;
1250 result->traverse_read = db_ctdb_traverse_read;
1251 result->get_seqnum = db_ctdb_get_seqnum;
1252 result->get_flags = db_ctdb_get_flags;
1253 result->transaction_start = db_ctdb_transaction_start;
1254 result->transaction_commit = db_ctdb_transaction_commit;
1255 result->transaction_cancel = db_ctdb_transaction_cancel;
1257 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1258 name, db_ctdb->db_id));