2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
37 struct tdb_wrap *wtdb;
39 struct db_ctdb_transaction_handle *transaction;
43 struct db_ctdb_ctx *ctdb_ctx;
44 struct ctdb_ltdb_header header;
47 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
52 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
55 enum TDB_ERROR tret = tdb_error(tdb);
59 status = NT_STATUS_OBJECT_NAME_COLLISION;
62 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
65 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
75 form a ctdb_rec_data record from a key/data pair
77 note that header may be NULL. If not NULL then it is included in the data portion
80 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
82 struct ctdb_ltdb_header *header,
86 struct ctdb_rec_data *d;
88 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
89 data.dsize + (header?sizeof(*header):0);
90 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
96 d->keylen = key.dsize;
97 memcpy(&d->data[0], key.dptr, key.dsize);
99 d->datalen = data.dsize + sizeof(*header);
100 memcpy(&d->data[key.dsize], header, sizeof(*header));
101 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
103 d->datalen = data.dsize;
104 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
110 /* helper function for marshalling multiple records */
111 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
112 struct ctdb_marshall_buffer *m,
116 struct ctdb_ltdb_header *header,
119 struct ctdb_rec_data *r;
120 size_t m_size, r_size;
121 struct ctdb_marshall_buffer *m2;
123 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
130 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
137 m_size = talloc_get_size(m);
138 r_size = talloc_get_size(r);
140 m2 = talloc_realloc_size(mem_ctx, m, m_size + r_size);
146 memcpy(m_size + (uint8_t *)m2, r, r_size);
155 /* we've finished marshalling, return a data blob with the marshalled records */
156 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
159 data.dptr = (uint8_t *)m;
160 data.dsize = talloc_get_size(m);
165 loop over a marshalling buffer
167 - pass r==NULL to start
168 - loop the number of times indicated by m->count
170 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
172 struct ctdb_ltdb_header *header,
173 TDB_DATA *key, TDB_DATA *data)
176 r = (struct ctdb_rec_data *)&m->data[0];
178 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
186 key->dptr = &r->data[0];
187 key->dsize = r->keylen;
190 data->dptr = &r->data[r->keylen];
191 data->dsize = r->datalen;
192 if (header != NULL) {
193 data->dptr += sizeof(*header);
194 data->dsize -= sizeof(*header);
198 if (header != NULL) {
199 if (r->datalen < sizeof(*header)) {
202 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
210 /* start a transaction on a database */
211 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
213 tdb_transaction_cancel(h->ctx->wtdb->tdb);
217 /* start a transaction on a database */
218 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
220 struct db_record *rh;
223 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
225 struct db_ctdb_ctx *ctx = h->ctx;
228 key.dptr = discard_const(keyname);
229 key.dsize = strlen(keyname);
232 tmp_ctx = talloc_new(h);
234 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
236 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
237 talloc_free(tmp_ctx);
242 ret = tdb_transaction_start(ctx->wtdb->tdb);
244 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
245 talloc_free(tmp_ctx);
249 data = tdb_fetch(ctx->wtdb->tdb, key);
250 if ((data.dptr == NULL) ||
251 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
252 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
253 SAFE_FREE(data.dptr);
254 tdb_transaction_cancel(ctx->wtdb->tdb);
255 talloc_free(tmp_ctx);
259 SAFE_FREE(data.dptr);
260 talloc_free(tmp_ctx);
266 /* start a transaction on a database */
267 static int db_ctdb_transaction_start(struct db_context *db)
269 struct db_ctdb_transaction_handle *h;
271 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
274 if (!db->persistent) {
275 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
280 if (ctx->transaction) {
281 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
285 h = talloc_zero(db, struct db_ctdb_transaction_handle);
287 DEBUG(0,(__location__ " oom for transaction handle\n"));
293 ret = db_ctdb_transaction_fetch_start(h);
299 talloc_set_destructor(h, db_ctdb_transaction_destructor);
301 ctx->transaction = h;
303 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
311 fetch a record inside a transaction
313 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
315 TDB_DATA key, TDB_DATA *data)
317 struct db_ctdb_transaction_handle *h = db->transaction;
319 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
321 if (data->dptr != NULL) {
322 uint8_t *oldptr = (uint8_t *)data->dptr;
323 data->dsize -= sizeof(struct ctdb_ltdb_header);
324 data->dptr = (uint8 *)
326 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
329 if (data->dptr == NULL) {
335 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
336 if (h->m_all == NULL) {
337 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
339 talloc_free(data->dptr);
348 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
349 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
351 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
355 struct db_record *result;
358 if (!(result = talloc(mem_ctx, struct db_record))) {
359 DEBUG(0, ("talloc failed\n"));
363 result->private_data = ctx->transaction;
365 result->key.dsize = key.dsize;
366 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
367 if (result->key.dptr == NULL) {
368 DEBUG(0, ("talloc failed\n"));
373 result->store = db_ctdb_store_transaction;
374 result->delete_rec = db_ctdb_delete_transaction;
376 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
377 if (ctdb_data.dptr == NULL) {
378 /* create the record */
379 result->value = tdb_null;
383 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
384 result->value.dptr = NULL;
386 if ((result->value.dsize != 0)
387 && !(result->value.dptr = (uint8 *)talloc_memdup(
388 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
389 result->value.dsize))) {
390 DEBUG(0, ("talloc failed\n"));
394 SAFE_FREE(ctdb_data.dptr);
401 stores a record inside a transaction
403 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
404 TDB_DATA key, TDB_DATA data)
406 TALLOC_CTX *tmp_ctx = talloc_new(h);
409 struct ctdb_ltdb_header header;
411 /* we need the header so we can update the RSN */
412 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
413 if (rec.dptr == NULL) {
414 /* the record doesn't exist - create one with us as dmaster.
415 This is only safe because we are in a transaction and this
416 is a persistent database */
418 header.dmaster = get_my_vnn();
420 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
427 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
428 if (h->m_all == NULL) {
429 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
430 talloc_free(tmp_ctx);
434 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
435 if (h->m_write == NULL) {
436 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
437 talloc_free(tmp_ctx);
442 rec.dptr = talloc_size(tmp_ctx, data.dsize + sizeof(struct ctdb_ltdb_header));
443 if (rec.dptr == NULL) {
444 DEBUG(0,(__location__ " Failed to alloc record\n"));
445 talloc_free(tmp_ctx);
448 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
449 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
451 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
453 talloc_free(tmp_ctx);
460 a record store inside a transaction
462 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
464 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
465 rec->private_data, struct db_ctdb_transaction_handle);
468 ret = db_ctdb_transaction_store(h, rec->key, data);
470 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
476 a record delete inside a transaction
478 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
480 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
481 rec->private_data, struct db_ctdb_transaction_handle);
484 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
486 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
495 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
498 struct ctdb_rec_data *rec = NULL;
502 ret = db_ctdb_transaction_fetch_start(h);
507 for (i=0;i<h->m_all->count;i++) {
510 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
512 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
516 if (rec->reqid == 0) {
518 if (db_ctdb_transaction_store(h, key, data) != 0) {
523 TALLOC_CTX *tmp_ctx = talloc_new(h);
525 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
526 talloc_free(tmp_ctx);
529 if (data2.dsize != data.dsize ||
530 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
531 /* the record has changed on us - we have to give up */
532 talloc_free(tmp_ctx);
535 talloc_free(tmp_ctx);
542 tdb_transaction_cancel(h->ctx->wtdb->tdb);
550 static int db_ctdb_transaction_commit(struct db_context *db)
552 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
557 struct db_ctdb_transaction_handle *h = ctx->transaction;
560 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
564 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
566 if (h->m_write == NULL) {
567 /* no changes were made */
569 ctx->transaction = NULL;
573 talloc_set_destructor(h, NULL);
575 /* our commit strategy is quite complex.
577 - we first try to commit the changes to all other nodes
579 - if that works, then we commit locally and we are done
581 - if a commit on another node fails, then we need to cancel
582 the transaction, then restart the transaction (thus
583 opening a window of time for a pending recovery to
584 complete), then replay the transaction, checking all the
585 reads and writes (checking that reads give the same data,
586 and writes succeed). Then we retry the transaction to the
591 /* tell ctdbd to commit to the other nodes */
592 rets = ctdbd_control_local(messaging_ctdbd_connection(),
593 CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
594 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
595 if (!NT_STATUS_IS_OK(rets) || status != 0) {
596 tdb_transaction_cancel(h->ctx->wtdb->tdb);
598 if (ctdb_replay_transaction(h) != 0) {
599 DEBUG(0,(__location__ " Failed to replay transaction\n"));
600 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
601 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
602 tdb_null, NULL, NULL, NULL);
603 h->ctx->transaction = NULL;
605 ctx->transaction = NULL;
611 /* do the real commit locally */
612 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
614 DEBUG(0,(__location__ " Failed to commit transaction\n"));
615 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id,
616 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
617 h->ctx->transaction = NULL;
622 /* tell ctdbd that we are finished with our local commit */
623 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
624 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
625 tdb_null, NULL, NULL, NULL);
626 h->ctx->transaction = NULL;
635 static int db_ctdb_transaction_cancel(struct db_context *db)
637 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
639 struct db_ctdb_transaction_handle *h = ctx->transaction;
642 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
646 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
648 ctx->transaction = NULL;
654 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
656 struct db_ctdb_rec *crec = talloc_get_type_abort(
657 rec->private_data, struct db_ctdb_rec);
661 cdata.dsize = sizeof(crec->header) + data.dsize;
663 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
664 return NT_STATUS_NO_MEMORY;
667 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
668 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
670 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
672 SAFE_FREE(cdata.dptr);
674 return (ret == 0) ? NT_STATUS_OK
675 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
679 /* for persistent databases the store is a bit different. We have to
680 ask the ctdb daemon to push the record to all nodes after the
682 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
684 struct db_ctdb_rec *crec;
685 struct db_record *record;
690 int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
692 for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
693 (count < max_retries) && !NT_STATUS_IS_OK(status);
699 * There is a hack here: We use rec as a memory
700 * context and re-use it as the record struct ptr.
701 * We don't free the record data allocated
702 * in each turn. So all gets freed when the caller
703 * releases the original record. This is because
704 * we don't get the record passed in by reference
705 * in the first place and the caller relies on
706 * having to free the record himself.
708 record = fetch_locked_internal(crec->ctdb_ctx,
711 true /* persistent */);
712 if (record == NULL) {
713 DEBUG(5, ("fetch_locked_internal failed.\n"));
714 status = NT_STATUS_NO_MEMORY;
719 crec = talloc_get_type_abort(record->private_data,
722 cdata.dsize = sizeof(crec->header) + data.dsize;
724 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
725 return NT_STATUS_NO_MEMORY;
730 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
731 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
733 status = ctdbd_start_persistent_update(
734 messaging_ctdbd_connection(),
735 crec->ctdb_ctx->db_id,
739 if (NT_STATUS_IS_OK(status)) {
740 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
744 : tdb_error_to_ntstatus(
745 crec->ctdb_ctx->wtdb->tdb);
749 * release the lock *now* in order to prevent deadlocks.
751 * There is a tradeoff: Usually, the record is still locked
752 * after db->store operation. This lock is usually released
753 * via the talloc destructor with the TALLOC_FREE to
754 * the record. So we have two choices:
756 * - Either re-lock the record after the call to persistent_store
757 * or cancel_persistent update and this way not changing any
758 * assumptions callers may have about the state, but possibly
759 * introducing new race conditions.
761 * - Or don't lock the record again but just remove the
762 * talloc_destructor. This is less racy but assumes that
763 * the lock is always released via TALLOC_FREE of the record.
765 * I choose the first variant for now since it seems less racy.
766 * We can't guarantee that we succeed in getting the lock
767 * anyways. The only real danger here is that a caller
768 * performs multiple store operations after a fetch_locked()
769 * which is currently not the case.
771 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
772 talloc_set_destructor(record, NULL);
774 /* now tell ctdbd to update this record on all other nodes */
775 if (NT_STATUS_IS_OK(status)) {
776 status = ctdbd_persistent_store(
777 messaging_ctdbd_connection(),
778 crec->ctdb_ctx->db_id,
782 ctdbd_cancel_persistent_update(
783 messaging_ctdbd_connection(),
784 crec->ctdb_ctx->db_id,
790 SAFE_FREE(cdata.dptr);
793 if (!NT_STATUS_IS_OK(status)) {
794 DEBUG(5, ("ctdbd_persistent_store failed after "
795 "%d retries with error %s - giving up.\n",
796 count, nt_errstr(status)));
799 SAFE_FREE(cdata.dptr);
804 static NTSTATUS db_ctdb_delete(struct db_record *rec)
809 * We have to store the header with empty data. TODO: Fix the
815 return db_ctdb_store(rec, data, 0);
819 static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
824 * We have to store the header with empty data. TODO: Fix the
830 return db_ctdb_store_persistent(rec, data, 0);
834 static int db_ctdb_record_destr(struct db_record* data)
836 struct db_ctdb_rec *crec = talloc_get_type_abort(
837 data->private_data, struct db_ctdb_rec);
839 DEBUG(10, (DEBUGLEVEL > 10
840 ? "Unlocking db %u key %s\n"
841 : "Unlocking db %u key %.20s\n",
842 (int)crec->ctdb_ctx->db_id,
843 hex_encode(data, (unsigned char *)data->key.dptr,
846 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
847 DEBUG(0, ("tdb_chainunlock failed\n"));
854 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
859 struct db_record *result;
860 struct db_ctdb_rec *crec;
863 int migrate_attempts = 0;
865 if (ctx->transaction != NULL) {
866 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
869 if (!(result = talloc(mem_ctx, struct db_record))) {
870 DEBUG(0, ("talloc failed\n"));
874 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
875 DEBUG(0, ("talloc failed\n"));
880 result->private_data = (void *)crec;
881 crec->ctdb_ctx = ctx;
883 result->key.dsize = key.dsize;
884 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
885 if (result->key.dptr == NULL) {
886 DEBUG(0, ("talloc failed\n"));
892 * Do a blocking lock on the record
896 if (DEBUGLEVEL >= 10) {
897 char *keystr = hex_encode(result, key.dptr, key.dsize);
898 DEBUG(10, (DEBUGLEVEL > 10
899 ? "Locking db %u key %s\n"
900 : "Locking db %u key %.20s\n",
901 (int)crec->ctdb_ctx->db_id, keystr));
905 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
906 DEBUG(3, ("tdb_chainlock failed\n"));
912 result->store = db_ctdb_store_persistent;
913 result->delete_rec = db_ctdb_delete_persistent;
915 result->store = db_ctdb_store;
916 result->delete_rec = db_ctdb_delete;
918 talloc_set_destructor(result, db_ctdb_record_destr);
920 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
923 * See if we have a valid record and we are the dmaster. If so, we can
924 * take the shortcut and just return it.
927 if ((ctdb_data.dptr == NULL) ||
928 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
929 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
931 || (random() % 2 != 0)
934 SAFE_FREE(ctdb_data.dptr);
935 tdb_chainunlock(ctx->wtdb->tdb, key);
936 talloc_set_destructor(result, NULL);
938 migrate_attempts += 1;
940 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
941 ctdb_data.dptr, ctdb_data.dptr ?
942 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
945 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
946 if (!NT_STATUS_IS_OK(status)) {
947 DEBUG(5, ("ctdb_migrate failed: %s\n",
952 /* now its migrated, try again */
956 if (migrate_attempts > 10) {
957 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
961 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
963 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
964 result->value.dptr = NULL;
966 if ((result->value.dsize != 0)
967 && !(result->value.dptr = (uint8 *)talloc_memdup(
968 result, ctdb_data.dptr + sizeof(crec->header),
969 result->value.dsize))) {
970 DEBUG(0, ("talloc failed\n"));
974 SAFE_FREE(ctdb_data.dptr);
979 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
983 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
986 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
990 fetch (unlocked, no migration) operation on ctdb
992 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
993 TDB_DATA key, TDB_DATA *data)
995 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1000 if (ctx->transaction) {
1001 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1004 /* try a direct fetch */
1005 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1008 * See if we have a valid record and we are the dmaster. If so, we can
1009 * take the shortcut and just return it.
1010 * we bypass the dmaster check for persistent databases
1012 if ((ctdb_data.dptr != NULL) &&
1013 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1015 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1016 /* we are the dmaster - avoid the ctdb protocol op */
1018 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1019 if (data->dsize == 0) {
1020 SAFE_FREE(ctdb_data.dptr);
1025 data->dptr = (uint8 *)talloc_memdup(
1026 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1029 SAFE_FREE(ctdb_data.dptr);
1031 if (data->dptr == NULL) {
1037 SAFE_FREE(ctdb_data.dptr);
1039 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1040 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1041 if (!NT_STATUS_IS_OK(status)) {
1042 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1049 struct traverse_state {
1050 struct db_context *db;
1051 int (*fn)(struct db_record *rec, void *private_data);
1055 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1057 struct traverse_state *state = (struct traverse_state *)private_data;
1058 struct db_record *rec;
1059 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1060 /* we have to give them a locked record to prevent races */
1061 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1062 if (rec && rec->value.dsize > 0) {
1063 state->fn(rec, state->private_data);
1065 talloc_free(tmp_ctx);
1068 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1071 struct traverse_state *state = (struct traverse_state *)private_data;
1072 struct db_record *rec;
1073 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1075 /* we have to give them a locked record to prevent races */
1076 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1077 if (rec && rec->value.dsize > 0) {
1078 ret = state->fn(rec, state->private_data);
1080 talloc_free(tmp_ctx);
1084 static int db_ctdb_traverse(struct db_context *db,
1085 int (*fn)(struct db_record *rec,
1086 void *private_data),
1089 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1090 struct db_ctdb_ctx);
1091 struct traverse_state state;
1095 state.private_data = private_data;
1097 if (db->persistent) {
1098 /* for persistent databases we don't need to do a ctdb traverse,
1099 we can do a faster local traverse */
1100 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1104 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1108 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1110 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1113 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1115 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1118 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1120 struct traverse_state *state = (struct traverse_state *)private_data;
1121 struct db_record rec;
1124 rec.store = db_ctdb_store_deny;
1125 rec.delete_rec = db_ctdb_delete_deny;
1126 rec.private_data = state->db;
1127 state->fn(&rec, state->private_data);
1130 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1133 struct traverse_state *state = (struct traverse_state *)private_data;
1134 struct db_record rec;
1137 rec.store = db_ctdb_store_deny;
1138 rec.delete_rec = db_ctdb_delete_deny;
1139 rec.private_data = state->db;
1141 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1142 /* a deleted record */
1145 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1146 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1148 return state->fn(&rec, state->private_data);
1151 static int db_ctdb_traverse_read(struct db_context *db,
1152 int (*fn)(struct db_record *rec,
1153 void *private_data),
1156 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1157 struct db_ctdb_ctx);
1158 struct traverse_state state;
1162 state.private_data = private_data;
1164 if (db->persistent) {
1165 /* for persistent databases we don't need to do a ctdb traverse,
1166 we can do a faster local traverse */
1167 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1170 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1174 static int db_ctdb_get_seqnum(struct db_context *db)
1176 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1177 struct db_ctdb_ctx);
1178 return tdb_get_seqnum(ctx->wtdb->tdb);
1181 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1183 int hash_size, int tdb_flags,
1184 int open_flags, mode_t mode)
1186 struct db_context *result;
1187 struct db_ctdb_ctx *db_ctdb;
1190 if (!lp_clustering()) {
1191 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1195 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1196 DEBUG(0, ("talloc failed\n"));
1197 TALLOC_FREE(result);
1201 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1202 DEBUG(0, ("talloc failed\n"));
1203 TALLOC_FREE(result);
1207 db_ctdb->transaction = NULL;
1209 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1210 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1211 TALLOC_FREE(result);
1215 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1217 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1219 /* only pass through specific flags */
1220 tdb_flags &= TDB_SEQNUM;
1222 /* honor permissions if user has specified O_CREAT */
1223 if (open_flags & O_CREAT) {
1224 chmod(db_path, mode);
1227 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1228 if (db_ctdb->wtdb == NULL) {
1229 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1230 TALLOC_FREE(result);
1233 talloc_free(db_path);
1235 result->private_data = (void *)db_ctdb;
1236 result->fetch_locked = db_ctdb_fetch_locked;
1237 result->fetch = db_ctdb_fetch;
1238 result->traverse = db_ctdb_traverse;
1239 result->traverse_read = db_ctdb_traverse_read;
1240 result->get_seqnum = db_ctdb_get_seqnum;
1241 result->transaction_start = db_ctdb_transaction_start;
1242 result->transaction_commit = db_ctdb_transaction_commit;
1243 result->transaction_cancel = db_ctdb_transaction_cancel;
1245 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1246 name, db_ctdb->db_id));