2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
37 struct db_context *db;
38 struct tdb_wrap *wtdb;
40 struct db_ctdb_transaction_handle *transaction;
44 struct db_ctdb_ctx *ctdb_ctx;
45 struct ctdb_ltdb_header header;
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 enum TDB_ERROR tret = tdb_error(tdb);
60 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
76 form a ctdb_rec_data record from a key/data pair
78 note that header may be NULL. If not NULL then it is included in the data portion
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
83 struct ctdb_ltdb_header *header,
87 struct ctdb_rec_data *d;
89 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
90 data.dsize + (header?sizeof(*header):0);
91 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
97 d->keylen = key.dsize;
98 memcpy(&d->data[0], key.dptr, key.dsize);
100 d->datalen = data.dsize + sizeof(*header);
101 memcpy(&d->data[key.dsize], header, sizeof(*header));
102 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
104 d->datalen = data.dsize;
105 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
113 struct ctdb_marshall_buffer *m,
117 struct ctdb_ltdb_header *header,
120 struct ctdb_rec_data *r;
121 size_t m_size, r_size;
122 struct ctdb_marshall_buffer *m2;
124 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
131 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
138 m_size = talloc_get_size(m);
139 r_size = talloc_get_size(r);
141 m2 = talloc_realloc_size(mem_ctx, m, m_size + r_size);
147 memcpy(m_size + (uint8_t *)m2, r, r_size);
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
160 data.dptr = (uint8_t *)m;
161 data.dsize = talloc_get_size(m);
166 loop over a marshalling buffer
168 - pass r==NULL to start
169 - loop the number of times indicated by m->count
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
173 struct ctdb_ltdb_header *header,
174 TDB_DATA *key, TDB_DATA *data)
177 r = (struct ctdb_rec_data *)&m->data[0];
179 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
187 key->dptr = &r->data[0];
188 key->dsize = r->keylen;
191 data->dptr = &r->data[r->keylen];
192 data->dsize = r->datalen;
193 if (header != NULL) {
194 data->dptr += sizeof(*header);
195 data->dsize -= sizeof(*header);
199 if (header != NULL) {
200 if (r->datalen < sizeof(*header)) {
203 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
214 tdb_transaction_cancel(h->ctx->wtdb->tdb);
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
221 struct db_record *rh;
224 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
226 struct db_ctdb_ctx *ctx = h->ctx;
229 key.dptr = discard_const(keyname);
230 key.dsize = strlen(keyname);
233 tmp_ctx = talloc_new(h);
235 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
237 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
238 talloc_free(tmp_ctx);
243 ret = tdb_transaction_start(ctx->wtdb->tdb);
245 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246 talloc_free(tmp_ctx);
250 data = tdb_fetch(ctx->wtdb->tdb, key);
251 if ((data.dptr == NULL) ||
252 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254 SAFE_FREE(data.dptr);
255 tdb_transaction_cancel(ctx->wtdb->tdb);
256 talloc_free(tmp_ctx);
260 SAFE_FREE(data.dptr);
261 talloc_free(tmp_ctx);
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
270 struct db_ctdb_transaction_handle *h;
272 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
275 if (!db->persistent) {
276 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
281 if (ctx->transaction) {
282 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
286 h = talloc_zero(db, struct db_ctdb_transaction_handle);
288 DEBUG(0,(__location__ " oom for transaction handle\n"));
294 ret = db_ctdb_transaction_fetch_start(h);
300 talloc_set_destructor(h, db_ctdb_transaction_destructor);
302 ctx->transaction = h;
304 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
312 fetch a record inside a transaction
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
316 TDB_DATA key, TDB_DATA *data)
318 struct db_ctdb_transaction_handle *h = db->transaction;
320 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
322 if (data->dptr != NULL) {
323 uint8_t *oldptr = (uint8_t *)data->dptr;
324 data->dsize -= sizeof(struct ctdb_ltdb_header);
325 if (data->dsize == 0) {
328 data->dptr = (uint8 *)
330 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
334 if (data->dptr == NULL && data->dsize != 0) {
340 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341 if (h->m_all == NULL) {
342 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
344 talloc_free(data->dptr);
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
360 struct db_record *result;
363 if (!(result = talloc(mem_ctx, struct db_record))) {
364 DEBUG(0, ("talloc failed\n"));
368 result->private_data = ctx->transaction;
370 result->key.dsize = key.dsize;
371 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372 if (result->key.dptr == NULL) {
373 DEBUG(0, ("talloc failed\n"));
378 result->store = db_ctdb_store_transaction;
379 result->delete_rec = db_ctdb_delete_transaction;
381 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382 if (ctdb_data.dptr == NULL) {
383 /* create the record */
384 result->value = tdb_null;
388 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389 result->value.dptr = NULL;
391 if ((result->value.dsize != 0)
392 && !(result->value.dptr = (uint8 *)talloc_memdup(
393 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394 result->value.dsize))) {
395 DEBUG(0, ("talloc failed\n"));
399 SAFE_FREE(ctdb_data.dptr);
404 static int db_ctdb_record_destructor(struct db_record *rec)
406 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407 rec->private_data, struct db_ctdb_transaction_handle);
408 int ret = h->ctx->db->transaction_commit(h->ctx->db);
410 DEBUG(0,(__location__ " transaction_commit failed\n"));
416 auto-create a transaction for persistent databases
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
423 struct db_record *rec;
425 res = db_ctdb_transaction_start(ctx->db);
430 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
432 ctx->db->transaction_cancel(ctx->db);
436 /* destroy this transaction when we release the lock */
437 talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
443 stores a record inside a transaction
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
446 TDB_DATA key, TDB_DATA data)
448 TALLOC_CTX *tmp_ctx = talloc_new(h);
451 struct ctdb_ltdb_header header;
453 /* we need the header so we can update the RSN */
454 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455 if (rec.dptr == NULL) {
456 /* the record doesn't exist - create one with us as dmaster.
457 This is only safe because we are in a transaction and this
458 is a persistent database */
460 header.dmaster = get_my_vnn();
462 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
469 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
470 if (h->m_all == NULL) {
471 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
472 talloc_free(tmp_ctx);
476 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
477 if (h->m_write == NULL) {
478 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
479 talloc_free(tmp_ctx);
484 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
485 rec.dptr = talloc_size(tmp_ctx, rec.dsize);
486 if (rec.dptr == NULL) {
487 DEBUG(0,(__location__ " Failed to alloc record\n"));
488 talloc_free(tmp_ctx);
491 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
492 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
494 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
496 talloc_free(tmp_ctx);
503 a record store inside a transaction
505 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
507 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
508 rec->private_data, struct db_ctdb_transaction_handle);
511 ret = db_ctdb_transaction_store(h, rec->key, data);
513 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
519 a record delete inside a transaction
521 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
523 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
524 rec->private_data, struct db_ctdb_transaction_handle);
527 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
529 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
538 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
541 struct ctdb_rec_data *rec = NULL;
545 ret = db_ctdb_transaction_fetch_start(h);
550 for (i=0;i<h->m_all->count;i++) {
553 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
555 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
559 if (rec->reqid == 0) {
561 if (db_ctdb_transaction_store(h, key, data) != 0) {
566 TALLOC_CTX *tmp_ctx = talloc_new(h);
568 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
569 talloc_free(tmp_ctx);
572 if (data2.dsize != data.dsize ||
573 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
574 /* the record has changed on us - we have to give up */
575 talloc_free(tmp_ctx);
578 talloc_free(tmp_ctx);
585 tdb_transaction_cancel(h->ctx->wtdb->tdb);
593 static int db_ctdb_transaction_commit(struct db_context *db)
595 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
601 struct db_ctdb_transaction_handle *h = ctx->transaction;
604 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
608 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
610 if (h->m_write == NULL) {
611 /* no changes were made */
613 ctx->transaction = NULL;
617 talloc_set_destructor(h, NULL);
619 /* our commit strategy is quite complex.
621 - we first try to commit the changes to all other nodes
623 - if that works, then we commit locally and we are done
625 - if a commit on another node fails, then we need to cancel
626 the transaction, then restart the transaction (thus
627 opening a window of time for a pending recovery to
628 complete), then replay the transaction, checking all the
629 reads and writes (checking that reads give the same data,
630 and writes succeed). Then we retry the transaction to the
635 /* tell ctdbd to commit to the other nodes */
636 rets = ctdbd_control_local(messaging_ctdbd_connection(),
637 CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
638 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
639 if (!NT_STATUS_IS_OK(rets) || status != 0) {
640 tdb_transaction_cancel(h->ctx->wtdb->tdb);
642 if (ctdb_replay_transaction(h) != 0) {
643 DEBUG(0,(__location__ " Failed to replay transaction\n"));
644 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
645 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
646 tdb_null, NULL, NULL, NULL);
647 h->ctx->transaction = NULL;
649 ctx->transaction = NULL;
652 if (retries++ == 10) {
653 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries\n",
654 h->ctx->db_id, retries));
655 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
656 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
657 tdb_null, NULL, NULL, NULL);
658 h->ctx->transaction = NULL;
660 ctx->transaction = NULL;
666 /* do the real commit locally */
667 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
669 DEBUG(0,(__location__ " Failed to commit transaction\n"));
670 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id,
671 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
672 h->ctx->transaction = NULL;
677 /* tell ctdbd that we are finished with our local commit */
678 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
679 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
680 tdb_null, NULL, NULL, NULL);
681 h->ctx->transaction = NULL;
690 static int db_ctdb_transaction_cancel(struct db_context *db)
692 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
694 struct db_ctdb_transaction_handle *h = ctx->transaction;
697 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
701 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
703 ctx->transaction = NULL;
709 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
711 struct db_ctdb_rec *crec = talloc_get_type_abort(
712 rec->private_data, struct db_ctdb_rec);
716 cdata.dsize = sizeof(crec->header) + data.dsize;
718 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
719 return NT_STATUS_NO_MEMORY;
722 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
723 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
725 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
727 SAFE_FREE(cdata.dptr);
729 return (ret == 0) ? NT_STATUS_OK
730 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
735 static NTSTATUS db_ctdb_delete(struct db_record *rec)
740 * We have to store the header with empty data. TODO: Fix the
746 return db_ctdb_store(rec, data, 0);
750 static int db_ctdb_record_destr(struct db_record* data)
752 struct db_ctdb_rec *crec = talloc_get_type_abort(
753 data->private_data, struct db_ctdb_rec);
755 DEBUG(10, (DEBUGLEVEL > 10
756 ? "Unlocking db %u key %s\n"
757 : "Unlocking db %u key %.20s\n",
758 (int)crec->ctdb_ctx->db_id,
759 hex_encode(data, (unsigned char *)data->key.dptr,
762 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
763 DEBUG(0, ("tdb_chainunlock failed\n"));
770 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
775 struct db_record *result;
776 struct db_ctdb_rec *crec;
779 int migrate_attempts = 0;
781 if (!(result = talloc(mem_ctx, struct db_record))) {
782 DEBUG(0, ("talloc failed\n"));
786 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
787 DEBUG(0, ("talloc failed\n"));
792 result->private_data = (void *)crec;
793 crec->ctdb_ctx = ctx;
795 result->key.dsize = key.dsize;
796 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
797 if (result->key.dptr == NULL) {
798 DEBUG(0, ("talloc failed\n"));
804 * Do a blocking lock on the record
808 if (DEBUGLEVEL >= 10) {
809 char *keystr = hex_encode(result, key.dptr, key.dsize);
810 DEBUG(10, (DEBUGLEVEL > 10
811 ? "Locking db %u key %s\n"
812 : "Locking db %u key %.20s\n",
813 (int)crec->ctdb_ctx->db_id, keystr));
817 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
818 DEBUG(3, ("tdb_chainlock failed\n"));
823 result->store = db_ctdb_store;
824 result->delete_rec = db_ctdb_delete;
825 talloc_set_destructor(result, db_ctdb_record_destr);
827 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
830 * See if we have a valid record and we are the dmaster. If so, we can
831 * take the shortcut and just return it.
834 if ((ctdb_data.dptr == NULL) ||
835 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
836 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
838 || (random() % 2 != 0)
841 SAFE_FREE(ctdb_data.dptr);
842 tdb_chainunlock(ctx->wtdb->tdb, key);
843 talloc_set_destructor(result, NULL);
845 migrate_attempts += 1;
847 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
848 ctdb_data.dptr, ctdb_data.dptr ?
849 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
852 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
853 if (!NT_STATUS_IS_OK(status)) {
854 DEBUG(5, ("ctdb_migrate failed: %s\n",
859 /* now its migrated, try again */
863 if (migrate_attempts > 10) {
864 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
868 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
870 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
871 result->value.dptr = NULL;
873 if ((result->value.dsize != 0)
874 && !(result->value.dptr = (uint8 *)talloc_memdup(
875 result, ctdb_data.dptr + sizeof(crec->header),
876 result->value.dsize))) {
877 DEBUG(0, ("talloc failed\n"));
881 SAFE_FREE(ctdb_data.dptr);
886 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
890 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
893 if (ctx->transaction != NULL) {
894 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
897 if (db->persistent) {
898 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
901 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
905 fetch (unlocked, no migration) operation on ctdb
907 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
908 TDB_DATA key, TDB_DATA *data)
910 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
915 if (ctx->transaction) {
916 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
919 /* try a direct fetch */
920 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
923 * See if we have a valid record and we are the dmaster. If so, we can
924 * take the shortcut and just return it.
925 * we bypass the dmaster check for persistent databases
927 if ((ctdb_data.dptr != NULL) &&
928 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
930 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
931 /* we are the dmaster - avoid the ctdb protocol op */
933 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
934 if (data->dsize == 0) {
935 SAFE_FREE(ctdb_data.dptr);
940 data->dptr = (uint8 *)talloc_memdup(
941 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
944 SAFE_FREE(ctdb_data.dptr);
946 if (data->dptr == NULL) {
952 SAFE_FREE(ctdb_data.dptr);
954 /* we weren't able to get it locally - ask ctdb to fetch it for us */
955 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
956 if (!NT_STATUS_IS_OK(status)) {
957 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
964 struct traverse_state {
965 struct db_context *db;
966 int (*fn)(struct db_record *rec, void *private_data);
970 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
972 struct traverse_state *state = (struct traverse_state *)private_data;
973 struct db_record *rec;
974 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
975 /* we have to give them a locked record to prevent races */
976 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
977 if (rec && rec->value.dsize > 0) {
978 state->fn(rec, state->private_data);
980 talloc_free(tmp_ctx);
983 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
986 struct traverse_state *state = (struct traverse_state *)private_data;
987 struct db_record *rec;
988 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
990 /* we have to give them a locked record to prevent races */
991 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
992 if (rec && rec->value.dsize > 0) {
993 ret = state->fn(rec, state->private_data);
995 talloc_free(tmp_ctx);
999 static int db_ctdb_traverse(struct db_context *db,
1000 int (*fn)(struct db_record *rec,
1001 void *private_data),
1004 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1005 struct db_ctdb_ctx);
1006 struct traverse_state state;
1010 state.private_data = private_data;
1012 if (db->persistent) {
1013 /* for persistent databases we don't need to do a ctdb traverse,
1014 we can do a faster local traverse */
1015 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1019 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1023 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1025 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1028 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1030 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1033 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1035 struct traverse_state *state = (struct traverse_state *)private_data;
1036 struct db_record rec;
1039 rec.store = db_ctdb_store_deny;
1040 rec.delete_rec = db_ctdb_delete_deny;
1041 rec.private_data = state->db;
1042 state->fn(&rec, state->private_data);
1045 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1048 struct traverse_state *state = (struct traverse_state *)private_data;
1049 struct db_record rec;
1052 rec.store = db_ctdb_store_deny;
1053 rec.delete_rec = db_ctdb_delete_deny;
1054 rec.private_data = state->db;
1056 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1057 /* a deleted record */
1060 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1061 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1063 return state->fn(&rec, state->private_data);
1066 static int db_ctdb_traverse_read(struct db_context *db,
1067 int (*fn)(struct db_record *rec,
1068 void *private_data),
1071 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1072 struct db_ctdb_ctx);
1073 struct traverse_state state;
1077 state.private_data = private_data;
1079 if (db->persistent) {
1080 /* for persistent databases we don't need to do a ctdb traverse,
1081 we can do a faster local traverse */
1082 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1085 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1089 static int db_ctdb_get_seqnum(struct db_context *db)
1091 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1092 struct db_ctdb_ctx);
1093 return tdb_get_seqnum(ctx->wtdb->tdb);
1096 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1098 int hash_size, int tdb_flags,
1099 int open_flags, mode_t mode)
1101 struct db_context *result;
1102 struct db_ctdb_ctx *db_ctdb;
1105 if (!lp_clustering()) {
1106 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1110 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1111 DEBUG(0, ("talloc failed\n"));
1112 TALLOC_FREE(result);
1116 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1117 DEBUG(0, ("talloc failed\n"));
1118 TALLOC_FREE(result);
1122 db_ctdb->transaction = NULL;
1123 db_ctdb->db = result;
1125 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1126 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1127 TALLOC_FREE(result);
1131 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1133 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1135 /* only pass through specific flags */
1136 tdb_flags &= TDB_SEQNUM;
1138 /* honor permissions if user has specified O_CREAT */
1139 if (open_flags & O_CREAT) {
1140 chmod(db_path, mode);
1143 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1144 if (db_ctdb->wtdb == NULL) {
1145 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1146 TALLOC_FREE(result);
1149 talloc_free(db_path);
1151 result->private_data = (void *)db_ctdb;
1152 result->fetch_locked = db_ctdb_fetch_locked;
1153 result->fetch = db_ctdb_fetch;
1154 result->traverse = db_ctdb_traverse;
1155 result->traverse_read = db_ctdb_traverse_read;
1156 result->get_seqnum = db_ctdb_get_seqnum;
1157 result->transaction_start = db_ctdb_transaction_start;
1158 result->transaction_commit = db_ctdb_transaction_commit;
1159 result->transaction_cancel = db_ctdb_transaction_cancel;
1161 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1162 name, db_ctdb->db_id));