2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #ifdef CLUSTER_SUPPORT
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
44 struct db_ctdb_transaction_handle *transaction;
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
60 enum TDB_ERROR tret = tdb_error(tdb);
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
79 * fetch a record from the tdb, separating out the header
80 * information and returning the body of the record.
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
84 struct ctdb_ltdb_header *header,
91 rec = tdb_fetch(db->wtdb->tdb, key);
92 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93 status = NT_STATUS_NOT_FOUND;
98 header->dmaster = (uint32_t)-1;
105 *header = *(struct ctdb_ltdb_header *)rec.dptr;
109 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110 if (data->dsize == 0) {
113 data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
115 + sizeof(struct ctdb_ltdb_header),
117 if (data->dptr == NULL) {
118 status = NT_STATUS_NO_MEMORY;
124 status = NT_STATUS_OK;
132 * Store a record together with the ctdb record header
133 * in the local copy of the database.
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
137 struct ctdb_ltdb_header *header,
140 TALLOC_CTX *tmp_ctx = talloc_stackframe();
144 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
147 if (rec.dptr == NULL) {
148 talloc_free(tmp_ctx);
149 return NT_STATUS_NO_MEMORY;
152 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
155 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
157 talloc_free(tmp_ctx);
159 return (ret == 0) ? NT_STATUS_OK
160 : tdb_error_to_ntstatus(db->wtdb->tdb);
165 form a ctdb_rec_data record from a key/data pair
167 note that header may be NULL. If not NULL then it is included in the data portion
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
172 struct ctdb_ltdb_header *header,
176 struct ctdb_rec_data *d;
178 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
179 data.dsize + (header?sizeof(*header):0);
180 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
186 d->keylen = key.dsize;
187 memcpy(&d->data[0], key.dptr, key.dsize);
189 d->datalen = data.dsize + sizeof(*header);
190 memcpy(&d->data[key.dsize], header, sizeof(*header));
191 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
193 d->datalen = data.dsize;
194 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
202 struct ctdb_marshall_buffer *m,
206 struct ctdb_ltdb_header *header,
209 struct ctdb_rec_data *r;
210 size_t m_size, r_size;
211 struct ctdb_marshall_buffer *m2 = NULL;
213 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
220 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
228 m_size = talloc_get_size(m);
229 r_size = talloc_get_size(r);
231 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232 mem_ctx, m, m_size + r_size);
238 memcpy(m_size + (uint8_t *)m2, r, r_size);
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
251 data.dptr = (uint8_t *)m;
252 data.dsize = talloc_get_size(m);
257 loop over a marshalling buffer
259 - pass r==NULL to start
260 - loop the number of times indicated by m->count
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
264 struct ctdb_ltdb_header *header,
265 TDB_DATA *key, TDB_DATA *data)
268 r = (struct ctdb_rec_data *)&m->data[0];
270 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
278 key->dptr = &r->data[0];
279 key->dsize = r->keylen;
282 data->dptr = &r->data[r->keylen];
283 data->dsize = r->datalen;
284 if (header != NULL) {
285 data->dptr += sizeof(*header);
286 data->dsize -= sizeof(*header);
290 if (header != NULL) {
291 if (r->datalen < sizeof(*header)) {
294 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
307 indata.dptr = (uint8_t *)&db_id;
308 indata.dsize = sizeof(db_id);
310 ret = ctdbd_control_local(messaging_ctdbd_connection(),
311 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312 indata, NULL, NULL, &status);
314 if (!NT_STATUS_IS_OK(ret)) {
315 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
324 * CTDB transaction destructor
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
328 tdb_transaction_cancel(h->ctx->wtdb->tdb);
333 * start a transaction on a ctdb database:
334 * - lock the transaction lock key
335 * - start the tdb transaction
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
339 struct db_record *rh;
340 struct db_ctdb_rec *crec;
343 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
345 struct db_ctdb_ctx *ctx = h->ctx;
349 struct ctdb_ltdb_header header;
350 int32_t transaction_status;
352 key.dptr = (uint8_t *)discard_const(keyname);
353 key.dsize = strlen(keyname);
356 tmp_ctx = talloc_new(h);
358 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
360 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
361 talloc_free(tmp_ctx);
364 crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
366 transaction_status = db_ctdb_transaction_active(ctx->db_id);
367 if (transaction_status == 1) {
368 unsigned long int usec = (1000 + random()) % 100000;
369 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370 "Re-trying after %lu microseconds...",
372 talloc_free(tmp_ctx);
378 * store the pid in the database:
379 * it is not enought that the node is dmaster...
382 data.dptr = (unsigned char *)&pid;
383 data.dsize = sizeof(pid_t);
384 status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
385 if (!NT_STATUS_IS_OK(status)) {
386 DEBUG(0, (__location__ " Failed to store pid in transaction "
387 "record: %s\n", nt_errstr(status)));
388 talloc_free(tmp_ctx);
394 ret = tdb_transaction_start(ctx->wtdb->tdb);
396 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
397 talloc_free(tmp_ctx);
401 status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
402 if (!NT_STATUS_IS_OK(status)) {
403 DEBUG(0, (__location__ " failed to refetch transaction lock "
404 "record inside transaction: %s - retrying\n",
406 tdb_transaction_cancel(ctx->wtdb->tdb);
407 talloc_free(tmp_ctx);
411 if (header.dmaster != get_my_vnn()) {
412 DEBUG(3, (__location__ " refetch transaction lock record : "
413 "we are not dmaster any more "
414 "(dmaster[%u] != my_vnn[%u]) - retrying\n",
415 header.dmaster, get_my_vnn()));
416 tdb_transaction_cancel(ctx->wtdb->tdb);
417 talloc_free(tmp_ctx);
421 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
422 tdb_transaction_cancel(ctx->wtdb->tdb);
423 talloc_free(tmp_ctx);
427 talloc_free(tmp_ctx);
434 * CTDB dbwrap API: transaction_start function
435 * starts a transaction on a persistent database
437 static int db_ctdb_transaction_start(struct db_context *db)
439 struct db_ctdb_transaction_handle *h;
441 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
444 if (!db->persistent) {
445 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
450 if (ctx->transaction) {
451 ctx->transaction->nesting++;
455 h = talloc_zero(db, struct db_ctdb_transaction_handle);
457 DEBUG(0,(__location__ " oom for transaction handle\n"));
463 ret = db_ctdb_transaction_fetch_start(h);
469 talloc_set_destructor(h, db_ctdb_transaction_destructor);
471 ctx->transaction = h;
473 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
481 fetch a record inside a transaction
483 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
485 TDB_DATA key, TDB_DATA *data)
487 struct db_ctdb_transaction_handle *h = db->transaction;
490 status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
492 if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
494 } else if (!NT_STATUS_IS_OK(status)) {
499 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
500 if (h->m_all == NULL) {
501 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
503 talloc_free(data->dptr);
512 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
513 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
515 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
519 struct db_record *result;
522 if (!(result = talloc(mem_ctx, struct db_record))) {
523 DEBUG(0, ("talloc failed\n"));
527 result->private_data = ctx->transaction;
529 result->key.dsize = key.dsize;
530 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
531 if (result->key.dptr == NULL) {
532 DEBUG(0, ("talloc failed\n"));
537 result->store = db_ctdb_store_transaction;
538 result->delete_rec = db_ctdb_delete_transaction;
540 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
541 if (ctdb_data.dptr == NULL) {
542 /* create the record */
543 result->value = tdb_null;
547 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
548 result->value.dptr = NULL;
550 if ((result->value.dsize != 0)
551 && !(result->value.dptr = (uint8 *)talloc_memdup(
552 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
553 result->value.dsize))) {
554 DEBUG(0, ("talloc failed\n"));
558 SAFE_FREE(ctdb_data.dptr);
563 static int db_ctdb_record_destructor(struct db_record **recp)
565 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
566 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
567 rec->private_data, struct db_ctdb_transaction_handle);
568 int ret = h->ctx->db->transaction_commit(h->ctx->db);
570 DEBUG(0,(__location__ " transaction_commit failed\n"));
576 auto-create a transaction for persistent databases
578 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
583 struct db_record *rec, **recp;
585 res = db_ctdb_transaction_start(ctx->db);
590 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
592 ctx->db->transaction_cancel(ctx->db);
596 /* destroy this transaction when we release the lock */
597 recp = talloc(rec, struct db_record *);
599 ctx->db->transaction_cancel(ctx->db);
604 talloc_set_destructor(recp, db_ctdb_record_destructor);
610 stores a record inside a transaction
612 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
613 TDB_DATA key, TDB_DATA data)
615 TALLOC_CTX *tmp_ctx = talloc_new(h);
618 struct ctdb_ltdb_header header;
621 /* we need the header so we can update the RSN */
622 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
623 if (rec.dptr == NULL) {
624 /* the record doesn't exist - create one with us as dmaster.
625 This is only safe because we are in a transaction and this
626 is a persistent database */
629 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
630 rec.dsize -= sizeof(struct ctdb_ltdb_header);
631 /* a special case, we are writing the same data that is there now */
632 if (data.dsize == rec.dsize &&
633 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
635 talloc_free(tmp_ctx);
641 header.dmaster = get_my_vnn();
645 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
646 if (h->m_all == NULL) {
647 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
648 talloc_free(tmp_ctx);
653 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
654 if (h->m_write == NULL) {
655 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
656 talloc_free(tmp_ctx);
660 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
661 if (NT_STATUS_IS_OK(status)) {
667 talloc_free(tmp_ctx);
674 a record store inside a transaction
676 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
678 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
679 rec->private_data, struct db_ctdb_transaction_handle);
682 ret = db_ctdb_transaction_store(h, rec->key, data);
684 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
690 a record delete inside a transaction
692 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
694 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
695 rec->private_data, struct db_ctdb_transaction_handle);
698 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
700 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
709 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
712 struct ctdb_rec_data *rec = NULL;
715 talloc_free(h->m_write);
718 ret = db_ctdb_transaction_fetch_start(h);
723 for (i=0;i<h->m_all->count;i++) {
726 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
728 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
732 if (rec->reqid == 0) {
734 if (db_ctdb_transaction_store(h, key, data) != 0) {
739 TALLOC_CTX *tmp_ctx = talloc_new(h);
741 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
742 talloc_free(tmp_ctx);
745 if (data2.dsize != data.dsize ||
746 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
747 /* the record has changed on us - we have to give up */
748 talloc_free(tmp_ctx);
751 talloc_free(tmp_ctx);
758 tdb_transaction_cancel(h->ctx->wtdb->tdb);
766 static int db_ctdb_transaction_commit(struct db_context *db)
768 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
774 struct db_ctdb_transaction_handle *h = ctx->transaction;
775 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
778 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
782 if (h->nested_cancel) {
783 db->transaction_cancel(db);
784 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
788 if (h->nesting != 0) {
793 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
795 talloc_set_destructor(h, NULL);
797 /* our commit strategy is quite complex.
799 - we first try to commit the changes to all other nodes
801 - if that works, then we commit locally and we are done
803 - if a commit on another node fails, then we need to cancel
804 the transaction, then restart the transaction (thus
805 opening a window of time for a pending recovery to
806 complete), then replay the transaction, checking all the
807 reads and writes (checking that reads give the same data,
808 and writes succeed). Then we retry the transaction to the
813 if (h->m_write == NULL) {
814 /* no changes were made, potentially after a retry */
815 tdb_transaction_cancel(h->ctx->wtdb->tdb);
817 ctx->transaction = NULL;
821 /* tell ctdbd to commit to the other nodes */
822 rets = ctdbd_control_local(messaging_ctdbd_connection(),
823 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
825 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
826 if (!NT_STATUS_IS_OK(rets) || status != 0) {
827 tdb_transaction_cancel(h->ctx->wtdb->tdb);
830 if (!NT_STATUS_IS_OK(rets)) {
831 failure_control = CTDB_CONTROL_TRANS2_ERROR;
833 /* work out what error code we will give if we
834 have to fail the operation */
835 switch ((enum ctdb_trans2_commit_error)status) {
836 case CTDB_TRANS2_COMMIT_SUCCESS:
837 case CTDB_TRANS2_COMMIT_SOMEFAIL:
838 case CTDB_TRANS2_COMMIT_TIMEOUT:
839 failure_control = CTDB_CONTROL_TRANS2_ERROR;
841 case CTDB_TRANS2_COMMIT_ALLFAIL:
842 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
847 if (++retries == 5) {
848 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
849 h->ctx->db_id, retries, (unsigned)failure_control));
850 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
851 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
852 tdb_null, NULL, NULL, NULL);
853 h->ctx->transaction = NULL;
855 ctx->transaction = NULL;
859 if (ctdb_replay_transaction(h) != 0) {
860 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
861 (unsigned)failure_control));
862 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
863 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
864 tdb_null, NULL, NULL, NULL);
865 h->ctx->transaction = NULL;
867 ctx->transaction = NULL;
872 failure_control = CTDB_CONTROL_TRANS2_ERROR;
875 /* do the real commit locally */
876 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
878 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
879 (unsigned)failure_control));
880 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
881 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
882 h->ctx->transaction = NULL;
887 /* tell ctdbd that we are finished with our local commit */
888 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
889 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
890 tdb_null, NULL, NULL, NULL);
891 h->ctx->transaction = NULL;
900 static int db_ctdb_transaction_cancel(struct db_context *db)
902 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
904 struct db_ctdb_transaction_handle *h = ctx->transaction;
907 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
911 if (h->nesting != 0) {
913 h->nested_cancel = true;
917 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
919 ctx->transaction = NULL;
925 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
927 struct db_ctdb_rec *crec = talloc_get_type_abort(
928 rec->private_data, struct db_ctdb_rec);
930 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
935 static NTSTATUS db_ctdb_delete(struct db_record *rec)
940 * We have to store the header with empty data. TODO: Fix the
946 return db_ctdb_store(rec, data, 0);
950 static int db_ctdb_record_destr(struct db_record* data)
952 struct db_ctdb_rec *crec = talloc_get_type_abort(
953 data->private_data, struct db_ctdb_rec);
955 DEBUG(10, (DEBUGLEVEL > 10
956 ? "Unlocking db %u key %s\n"
957 : "Unlocking db %u key %.20s\n",
958 (int)crec->ctdb_ctx->db_id,
959 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
962 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
963 DEBUG(0, ("tdb_chainunlock failed\n"));
970 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
975 struct db_record *result;
976 struct db_ctdb_rec *crec;
979 int migrate_attempts = 0;
981 if (!(result = talloc(mem_ctx, struct db_record))) {
982 DEBUG(0, ("talloc failed\n"));
986 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
987 DEBUG(0, ("talloc failed\n"));
992 result->private_data = (void *)crec;
993 crec->ctdb_ctx = ctx;
995 result->key.dsize = key.dsize;
996 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
997 if (result->key.dptr == NULL) {
998 DEBUG(0, ("talloc failed\n"));
1004 * Do a blocking lock on the record
1008 if (DEBUGLEVEL >= 10) {
1009 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1010 DEBUG(10, (DEBUGLEVEL > 10
1011 ? "Locking db %u key %s\n"
1012 : "Locking db %u key %.20s\n",
1013 (int)crec->ctdb_ctx->db_id, keystr));
1014 TALLOC_FREE(keystr);
1017 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1018 DEBUG(3, ("tdb_chainlock failed\n"));
1019 TALLOC_FREE(result);
1023 result->store = db_ctdb_store;
1024 result->delete_rec = db_ctdb_delete;
1025 talloc_set_destructor(result, db_ctdb_record_destr);
1027 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1030 * See if we have a valid record and we are the dmaster. If so, we can
1031 * take the shortcut and just return it.
1034 if ((ctdb_data.dptr == NULL) ||
1035 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1036 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1038 || (random() % 2 != 0)
1041 SAFE_FREE(ctdb_data.dptr);
1042 tdb_chainunlock(ctx->wtdb->tdb, key);
1043 talloc_set_destructor(result, NULL);
1045 migrate_attempts += 1;
1047 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1048 ctdb_data.dptr, ctdb_data.dptr ?
1049 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1052 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1053 if (!NT_STATUS_IS_OK(status)) {
1054 DEBUG(5, ("ctdb_migrate failed: %s\n",
1055 nt_errstr(status)));
1056 TALLOC_FREE(result);
1059 /* now its migrated, try again */
1063 if (migrate_attempts > 10) {
1064 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1068 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1070 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1071 result->value.dptr = NULL;
1073 if ((result->value.dsize != 0)
1074 && !(result->value.dptr = (uint8 *)talloc_memdup(
1075 result, ctdb_data.dptr + sizeof(crec->header),
1076 result->value.dsize))) {
1077 DEBUG(0, ("talloc failed\n"));
1078 TALLOC_FREE(result);
1081 SAFE_FREE(ctdb_data.dptr);
1086 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1087 TALLOC_CTX *mem_ctx,
1090 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1091 struct db_ctdb_ctx);
1093 if (ctx->transaction != NULL) {
1094 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1097 if (db->persistent) {
1098 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1101 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1105 fetch (unlocked, no migration) operation on ctdb
1107 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1108 TDB_DATA key, TDB_DATA *data)
1110 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1111 struct db_ctdb_ctx);
1115 if (ctx->transaction) {
1116 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1119 /* try a direct fetch */
1120 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1123 * See if we have a valid record and we are the dmaster. If so, we can
1124 * take the shortcut and just return it.
1125 * we bypass the dmaster check for persistent databases
1127 if ((ctdb_data.dptr != NULL) &&
1128 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1130 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1131 /* we are the dmaster - avoid the ctdb protocol op */
1133 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1134 if (data->dsize == 0) {
1135 SAFE_FREE(ctdb_data.dptr);
1140 data->dptr = (uint8 *)talloc_memdup(
1141 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1144 SAFE_FREE(ctdb_data.dptr);
1146 if (data->dptr == NULL) {
1152 SAFE_FREE(ctdb_data.dptr);
1154 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1155 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1156 if (!NT_STATUS_IS_OK(status)) {
1157 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1164 struct traverse_state {
1165 struct db_context *db;
1166 int (*fn)(struct db_record *rec, void *private_data);
1170 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1172 struct traverse_state *state = (struct traverse_state *)private_data;
1173 struct db_record *rec;
1174 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1175 /* we have to give them a locked record to prevent races */
1176 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1177 if (rec && rec->value.dsize > 0) {
1178 state->fn(rec, state->private_data);
1180 talloc_free(tmp_ctx);
1183 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1186 struct traverse_state *state = (struct traverse_state *)private_data;
1187 struct db_record *rec;
1188 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1190 /* we have to give them a locked record to prevent races */
1191 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1192 if (rec && rec->value.dsize > 0) {
1193 ret = state->fn(rec, state->private_data);
1195 talloc_free(tmp_ctx);
1199 static int db_ctdb_traverse(struct db_context *db,
1200 int (*fn)(struct db_record *rec,
1201 void *private_data),
1204 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1205 struct db_ctdb_ctx);
1206 struct traverse_state state;
1210 state.private_data = private_data;
1212 if (db->persistent) {
1213 /* for persistent databases we don't need to do a ctdb traverse,
1214 we can do a faster local traverse */
1215 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1219 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1223 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1225 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1228 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1230 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1233 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1235 struct traverse_state *state = (struct traverse_state *)private_data;
1236 struct db_record rec;
1239 rec.store = db_ctdb_store_deny;
1240 rec.delete_rec = db_ctdb_delete_deny;
1241 rec.private_data = state->db;
1242 state->fn(&rec, state->private_data);
1245 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1248 struct traverse_state *state = (struct traverse_state *)private_data;
1249 struct db_record rec;
1252 rec.store = db_ctdb_store_deny;
1253 rec.delete_rec = db_ctdb_delete_deny;
1254 rec.private_data = state->db;
1256 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1257 /* a deleted record */
1260 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1261 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1263 return state->fn(&rec, state->private_data);
1266 static int db_ctdb_traverse_read(struct db_context *db,
1267 int (*fn)(struct db_record *rec,
1268 void *private_data),
1271 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1272 struct db_ctdb_ctx);
1273 struct traverse_state state;
1277 state.private_data = private_data;
1279 if (db->persistent) {
1280 /* for persistent databases we don't need to do a ctdb traverse,
1281 we can do a faster local traverse */
1282 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1285 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1289 static int db_ctdb_get_seqnum(struct db_context *db)
1291 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1292 struct db_ctdb_ctx);
1293 return tdb_get_seqnum(ctx->wtdb->tdb);
1296 static int db_ctdb_get_flags(struct db_context *db)
1298 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1299 struct db_ctdb_ctx);
1300 return tdb_get_flags(ctx->wtdb->tdb);
1303 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1305 int hash_size, int tdb_flags,
1306 int open_flags, mode_t mode)
1308 struct db_context *result;
1309 struct db_ctdb_ctx *db_ctdb;
1312 if (!lp_clustering()) {
1313 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1317 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1318 DEBUG(0, ("talloc failed\n"));
1319 TALLOC_FREE(result);
1323 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1324 DEBUG(0, ("talloc failed\n"));
1325 TALLOC_FREE(result);
1329 db_ctdb->transaction = NULL;
1330 db_ctdb->db = result;
1332 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1333 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1334 TALLOC_FREE(result);
1338 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1340 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1342 /* only pass through specific flags */
1343 tdb_flags &= TDB_SEQNUM;
1345 /* honor permissions if user has specified O_CREAT */
1346 if (open_flags & O_CREAT) {
1347 chmod(db_path, mode);
1350 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1351 if (db_ctdb->wtdb == NULL) {
1352 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1353 TALLOC_FREE(result);
1356 talloc_free(db_path);
1358 result->private_data = (void *)db_ctdb;
1359 result->fetch_locked = db_ctdb_fetch_locked;
1360 result->fetch = db_ctdb_fetch;
1361 result->traverse = db_ctdb_traverse;
1362 result->traverse_read = db_ctdb_traverse_read;
1363 result->get_seqnum = db_ctdb_get_seqnum;
1364 result->get_flags = db_ctdb_get_flags;
1365 result->transaction_start = db_ctdb_transaction_start;
1366 result->transaction_commit = db_ctdb_transaction_commit;
1367 result->transaction_cancel = db_ctdb_transaction_cancel;
1369 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1370 name, db_ctdb->db_id));