2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
37 struct db_context *db;
38 struct tdb_wrap *wtdb;
40 struct db_ctdb_transaction_handle *transaction;
44 struct db_ctdb_ctx *ctdb_ctx;
45 struct ctdb_ltdb_header header;
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 enum TDB_ERROR tret = tdb_error(tdb);
60 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
76 form a ctdb_rec_data record from a key/data pair
78 note that header may be NULL. If not NULL then it is included in the data portion
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
83 struct ctdb_ltdb_header *header,
87 struct ctdb_rec_data *d;
89 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
90 data.dsize + (header?sizeof(*header):0);
91 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
97 d->keylen = key.dsize;
98 memcpy(&d->data[0], key.dptr, key.dsize);
100 d->datalen = data.dsize + sizeof(*header);
101 memcpy(&d->data[key.dsize], header, sizeof(*header));
102 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
104 d->datalen = data.dsize;
105 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
113 struct ctdb_marshall_buffer *m,
117 struct ctdb_ltdb_header *header,
120 struct ctdb_rec_data *r;
121 size_t m_size, r_size;
122 struct ctdb_marshall_buffer *m2;
124 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
131 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
138 m_size = talloc_get_size(m);
139 r_size = talloc_get_size(r);
141 m2 = talloc_realloc_size(mem_ctx, m, m_size + r_size);
147 memcpy(m_size + (uint8_t *)m2, r, r_size);
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
160 data.dptr = (uint8_t *)m;
161 data.dsize = talloc_get_size(m);
166 loop over a marshalling buffer
168 - pass r==NULL to start
169 - loop the number of times indicated by m->count
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
173 struct ctdb_ltdb_header *header,
174 TDB_DATA *key, TDB_DATA *data)
177 r = (struct ctdb_rec_data *)&m->data[0];
179 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
187 key->dptr = &r->data[0];
188 key->dsize = r->keylen;
191 data->dptr = &r->data[r->keylen];
192 data->dsize = r->datalen;
193 if (header != NULL) {
194 data->dptr += sizeof(*header);
195 data->dsize -= sizeof(*header);
199 if (header != NULL) {
200 if (r->datalen < sizeof(*header)) {
203 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
214 tdb_transaction_cancel(h->ctx->wtdb->tdb);
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
221 struct db_record *rh;
224 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
226 struct db_ctdb_ctx *ctx = h->ctx;
229 key.dptr = discard_const(keyname);
230 key.dsize = strlen(keyname);
233 tmp_ctx = talloc_new(h);
235 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
237 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
238 talloc_free(tmp_ctx);
243 ret = tdb_transaction_start(ctx->wtdb->tdb);
245 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246 talloc_free(tmp_ctx);
250 data = tdb_fetch(ctx->wtdb->tdb, key);
251 if ((data.dptr == NULL) ||
252 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254 SAFE_FREE(data.dptr);
255 tdb_transaction_cancel(ctx->wtdb->tdb);
256 talloc_free(tmp_ctx);
260 SAFE_FREE(data.dptr);
261 talloc_free(tmp_ctx);
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
270 struct db_ctdb_transaction_handle *h;
272 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
275 if (!db->persistent) {
276 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
281 if (ctx->transaction) {
282 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
286 h = talloc_zero(db, struct db_ctdb_transaction_handle);
288 DEBUG(0,(__location__ " oom for transaction handle\n"));
294 ret = db_ctdb_transaction_fetch_start(h);
300 talloc_set_destructor(h, db_ctdb_transaction_destructor);
302 ctx->transaction = h;
304 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
312 fetch a record inside a transaction
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
316 TDB_DATA key, TDB_DATA *data)
318 struct db_ctdb_transaction_handle *h = db->transaction;
320 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
322 if (data->dptr != NULL) {
323 uint8_t *oldptr = (uint8_t *)data->dptr;
324 data->dsize -= sizeof(struct ctdb_ltdb_header);
325 if (data->dsize == 0) {
328 data->dptr = (uint8 *)
330 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
334 if (data->dptr == NULL && data->dsize != 0) {
340 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341 if (h->m_all == NULL) {
342 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
344 talloc_free(data->dptr);
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
360 struct db_record *result;
363 if (!(result = talloc(mem_ctx, struct db_record))) {
364 DEBUG(0, ("talloc failed\n"));
368 result->private_data = ctx->transaction;
370 result->key.dsize = key.dsize;
371 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372 if (result->key.dptr == NULL) {
373 DEBUG(0, ("talloc failed\n"));
378 result->store = db_ctdb_store_transaction;
379 result->delete_rec = db_ctdb_delete_transaction;
381 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382 if (ctdb_data.dptr == NULL) {
383 /* create the record */
384 result->value = tdb_null;
388 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389 result->value.dptr = NULL;
391 if ((result->value.dsize != 0)
392 && !(result->value.dptr = (uint8 *)talloc_memdup(
393 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394 result->value.dsize))) {
395 DEBUG(0, ("talloc failed\n"));
399 SAFE_FREE(ctdb_data.dptr);
404 static int db_ctdb_record_destructor(struct db_record *rec)
406 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407 rec->private_data, struct db_ctdb_transaction_handle);
408 int ret = h->ctx->db->transaction_commit(h->ctx->db);
410 DEBUG(0,(__location__ " transaction_commit failed\n"));
416 auto-create a transaction for persistent databases
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
423 struct db_record *rec;
425 res = db_ctdb_transaction_start(ctx->db);
430 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
432 ctx->db->transaction_cancel(ctx->db);
436 /* destroy this transaction when we release the lock */
437 talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
443 stores a record inside a transaction
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
446 TDB_DATA key, TDB_DATA data)
448 TALLOC_CTX *tmp_ctx = talloc_new(h);
451 struct ctdb_ltdb_header header;
453 /* we need the header so we can update the RSN */
454 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455 if (rec.dptr == NULL) {
456 /* the record doesn't exist - create one with us as dmaster.
457 This is only safe because we are in a transaction and this
458 is a persistent database */
460 header.dmaster = get_my_vnn();
462 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
469 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
470 if (h->m_all == NULL) {
471 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
472 talloc_free(tmp_ctx);
476 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
477 if (h->m_write == NULL) {
478 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
479 talloc_free(tmp_ctx);
484 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
485 rec.dptr = talloc_size(tmp_ctx, rec.dsize);
486 if (rec.dptr == NULL) {
487 DEBUG(0,(__location__ " Failed to alloc record\n"));
488 talloc_free(tmp_ctx);
491 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
492 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
494 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
496 talloc_free(tmp_ctx);
503 a record store inside a transaction
505 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
507 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
508 rec->private_data, struct db_ctdb_transaction_handle);
511 ret = db_ctdb_transaction_store(h, rec->key, data);
513 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
519 a record delete inside a transaction
521 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
523 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
524 rec->private_data, struct db_ctdb_transaction_handle);
527 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
529 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
538 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
541 struct ctdb_rec_data *rec = NULL;
545 ret = db_ctdb_transaction_fetch_start(h);
550 for (i=0;i<h->m_all->count;i++) {
553 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
555 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
559 if (rec->reqid == 0) {
561 if (db_ctdb_transaction_store(h, key, data) != 0) {
566 TALLOC_CTX *tmp_ctx = talloc_new(h);
568 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
569 talloc_free(tmp_ctx);
572 if (data2.dsize != data.dsize ||
573 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
574 /* the record has changed on us - we have to give up */
575 talloc_free(tmp_ctx);
578 talloc_free(tmp_ctx);
585 tdb_transaction_cancel(h->ctx->wtdb->tdb);
593 static int db_ctdb_transaction_commit(struct db_context *db)
595 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
600 struct db_ctdb_transaction_handle *h = ctx->transaction;
603 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
607 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
609 if (h->m_write == NULL) {
610 /* no changes were made */
612 ctx->transaction = NULL;
616 talloc_set_destructor(h, NULL);
618 /* our commit strategy is quite complex.
620 - we first try to commit the changes to all other nodes
622 - if that works, then we commit locally and we are done
624 - if a commit on another node fails, then we need to cancel
625 the transaction, then restart the transaction (thus
626 opening a window of time for a pending recovery to
627 complete), then replay the transaction, checking all the
628 reads and writes (checking that reads give the same data,
629 and writes succeed). Then we retry the transaction to the
634 /* tell ctdbd to commit to the other nodes */
635 rets = ctdbd_control_local(messaging_ctdbd_connection(),
636 CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
637 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
638 if (!NT_STATUS_IS_OK(rets) || status != 0) {
639 tdb_transaction_cancel(h->ctx->wtdb->tdb);
641 if (ctdb_replay_transaction(h) != 0) {
642 DEBUG(0,(__location__ " Failed to replay transaction\n"));
643 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
644 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
645 tdb_null, NULL, NULL, NULL);
646 h->ctx->transaction = NULL;
648 ctx->transaction = NULL;
654 /* do the real commit locally */
655 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
657 DEBUG(0,(__location__ " Failed to commit transaction\n"));
658 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id,
659 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
660 h->ctx->transaction = NULL;
665 /* tell ctdbd that we are finished with our local commit */
666 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
667 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
668 tdb_null, NULL, NULL, NULL);
669 h->ctx->transaction = NULL;
678 static int db_ctdb_transaction_cancel(struct db_context *db)
680 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
682 struct db_ctdb_transaction_handle *h = ctx->transaction;
685 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
689 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
691 ctx->transaction = NULL;
697 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
699 struct db_ctdb_rec *crec = talloc_get_type_abort(
700 rec->private_data, struct db_ctdb_rec);
704 cdata.dsize = sizeof(crec->header) + data.dsize;
706 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
707 return NT_STATUS_NO_MEMORY;
710 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
711 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
713 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
715 SAFE_FREE(cdata.dptr);
717 return (ret == 0) ? NT_STATUS_OK
718 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
723 static NTSTATUS db_ctdb_delete(struct db_record *rec)
728 * We have to store the header with empty data. TODO: Fix the
734 return db_ctdb_store(rec, data, 0);
738 static int db_ctdb_record_destr(struct db_record* data)
740 struct db_ctdb_rec *crec = talloc_get_type_abort(
741 data->private_data, struct db_ctdb_rec);
743 DEBUG(10, (DEBUGLEVEL > 10
744 ? "Unlocking db %u key %s\n"
745 : "Unlocking db %u key %.20s\n",
746 (int)crec->ctdb_ctx->db_id,
747 hex_encode(data, (unsigned char *)data->key.dptr,
750 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
751 DEBUG(0, ("tdb_chainunlock failed\n"));
758 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
763 struct db_record *result;
764 struct db_ctdb_rec *crec;
767 int migrate_attempts = 0;
769 if (!(result = talloc(mem_ctx, struct db_record))) {
770 DEBUG(0, ("talloc failed\n"));
774 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
775 DEBUG(0, ("talloc failed\n"));
780 result->private_data = (void *)crec;
781 crec->ctdb_ctx = ctx;
783 result->key.dsize = key.dsize;
784 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
785 if (result->key.dptr == NULL) {
786 DEBUG(0, ("talloc failed\n"));
792 * Do a blocking lock on the record
796 if (DEBUGLEVEL >= 10) {
797 char *keystr = hex_encode(result, key.dptr, key.dsize);
798 DEBUG(10, (DEBUGLEVEL > 10
799 ? "Locking db %u key %s\n"
800 : "Locking db %u key %.20s\n",
801 (int)crec->ctdb_ctx->db_id, keystr));
805 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
806 DEBUG(3, ("tdb_chainlock failed\n"));
811 result->store = db_ctdb_store;
812 result->delete_rec = db_ctdb_delete;
813 talloc_set_destructor(result, db_ctdb_record_destr);
815 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
818 * See if we have a valid record and we are the dmaster. If so, we can
819 * take the shortcut and just return it.
822 if ((ctdb_data.dptr == NULL) ||
823 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
824 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
826 || (random() % 2 != 0)
829 SAFE_FREE(ctdb_data.dptr);
830 tdb_chainunlock(ctx->wtdb->tdb, key);
831 talloc_set_destructor(result, NULL);
833 migrate_attempts += 1;
835 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
836 ctdb_data.dptr, ctdb_data.dptr ?
837 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
840 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
841 if (!NT_STATUS_IS_OK(status)) {
842 DEBUG(5, ("ctdb_migrate failed: %s\n",
847 /* now its migrated, try again */
851 if (migrate_attempts > 10) {
852 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
856 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
858 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
859 result->value.dptr = NULL;
861 if ((result->value.dsize != 0)
862 && !(result->value.dptr = (uint8 *)talloc_memdup(
863 result, ctdb_data.dptr + sizeof(crec->header),
864 result->value.dsize))) {
865 DEBUG(0, ("talloc failed\n"));
869 SAFE_FREE(ctdb_data.dptr);
874 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
878 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
881 if (ctx->transaction != NULL) {
882 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
885 if (db->persistent) {
886 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
889 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
893 fetch (unlocked, no migration) operation on ctdb
895 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
896 TDB_DATA key, TDB_DATA *data)
898 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
903 if (ctx->transaction) {
904 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
907 /* try a direct fetch */
908 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
911 * See if we have a valid record and we are the dmaster. If so, we can
912 * take the shortcut and just return it.
913 * we bypass the dmaster check for persistent databases
915 if ((ctdb_data.dptr != NULL) &&
916 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
918 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
919 /* we are the dmaster - avoid the ctdb protocol op */
921 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
922 if (data->dsize == 0) {
923 SAFE_FREE(ctdb_data.dptr);
928 data->dptr = (uint8 *)talloc_memdup(
929 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
932 SAFE_FREE(ctdb_data.dptr);
934 if (data->dptr == NULL) {
940 SAFE_FREE(ctdb_data.dptr);
942 /* we weren't able to get it locally - ask ctdb to fetch it for us */
943 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
944 if (!NT_STATUS_IS_OK(status)) {
945 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
952 struct traverse_state {
953 struct db_context *db;
954 int (*fn)(struct db_record *rec, void *private_data);
958 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
960 struct traverse_state *state = (struct traverse_state *)private_data;
961 struct db_record *rec;
962 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
963 /* we have to give them a locked record to prevent races */
964 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
965 if (rec && rec->value.dsize > 0) {
966 state->fn(rec, state->private_data);
968 talloc_free(tmp_ctx);
971 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
974 struct traverse_state *state = (struct traverse_state *)private_data;
975 struct db_record *rec;
976 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
978 /* we have to give them a locked record to prevent races */
979 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
980 if (rec && rec->value.dsize > 0) {
981 ret = state->fn(rec, state->private_data);
983 talloc_free(tmp_ctx);
987 static int db_ctdb_traverse(struct db_context *db,
988 int (*fn)(struct db_record *rec,
992 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
994 struct traverse_state state;
998 state.private_data = private_data;
1000 if (db->persistent) {
1001 /* for persistent databases we don't need to do a ctdb traverse,
1002 we can do a faster local traverse */
1003 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1007 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1011 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1013 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1016 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1018 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1021 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1023 struct traverse_state *state = (struct traverse_state *)private_data;
1024 struct db_record rec;
1027 rec.store = db_ctdb_store_deny;
1028 rec.delete_rec = db_ctdb_delete_deny;
1029 rec.private_data = state->db;
1030 state->fn(&rec, state->private_data);
1033 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1036 struct traverse_state *state = (struct traverse_state *)private_data;
1037 struct db_record rec;
1040 rec.store = db_ctdb_store_deny;
1041 rec.delete_rec = db_ctdb_delete_deny;
1042 rec.private_data = state->db;
1044 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1045 /* a deleted record */
1048 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1049 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1051 return state->fn(&rec, state->private_data);
1054 static int db_ctdb_traverse_read(struct db_context *db,
1055 int (*fn)(struct db_record *rec,
1056 void *private_data),
1059 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1060 struct db_ctdb_ctx);
1061 struct traverse_state state;
1065 state.private_data = private_data;
1067 if (db->persistent) {
1068 /* for persistent databases we don't need to do a ctdb traverse,
1069 we can do a faster local traverse */
1070 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1073 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1077 static int db_ctdb_get_seqnum(struct db_context *db)
1079 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1080 struct db_ctdb_ctx);
1081 return tdb_get_seqnum(ctx->wtdb->tdb);
1084 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1086 int hash_size, int tdb_flags,
1087 int open_flags, mode_t mode)
1089 struct db_context *result;
1090 struct db_ctdb_ctx *db_ctdb;
1093 if (!lp_clustering()) {
1094 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1098 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1099 DEBUG(0, ("talloc failed\n"));
1100 TALLOC_FREE(result);
1104 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1105 DEBUG(0, ("talloc failed\n"));
1106 TALLOC_FREE(result);
1110 db_ctdb->transaction = NULL;
1111 db_ctdb->db = result;
1113 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1114 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1115 TALLOC_FREE(result);
1119 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1121 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1123 /* only pass through specific flags */
1124 tdb_flags &= TDB_SEQNUM;
1126 /* honor permissions if user has specified O_CREAT */
1127 if (open_flags & O_CREAT) {
1128 chmod(db_path, mode);
1131 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1132 if (db_ctdb->wtdb == NULL) {
1133 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1134 TALLOC_FREE(result);
1137 talloc_free(db_path);
1139 result->private_data = (void *)db_ctdb;
1140 result->fetch_locked = db_ctdb_fetch_locked;
1141 result->fetch = db_ctdb_fetch;
1142 result->traverse = db_ctdb_traverse;
1143 result->traverse_read = db_ctdb_traverse_read;
1144 result->get_seqnum = db_ctdb_get_seqnum;
1145 result->transaction_start = db_ctdb_transaction_start;
1146 result->transaction_commit = db_ctdb_transaction_commit;
1147 result->transaction_cancel = db_ctdb_transaction_cancel;
1149 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1150 name, db_ctdb->db_id));