1f6af437cfa145353ceacc80bea7416436b20496
[ira/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /*
30          * we store the reads and writes done under a transaction:
31          * - one list stores both reads and writes (m_all),
32          * - the other just writes (m_write)
33          */
34         struct ctdb_marshall_buffer *m_all;
35         struct ctdb_marshall_buffer *m_write;
36         uint32_t nesting;
37         bool nested_cancel;
38 };
39
40 struct db_ctdb_ctx {
41         struct db_context *db;
42         struct tdb_wrap *wtdb;
43         uint32 db_id;
44         struct db_ctdb_transaction_handle *transaction;
45 };
46
47 struct db_ctdb_rec {
48         struct db_ctdb_ctx *ctdb_ctx;
49         struct ctdb_ltdb_header header;
50 };
51
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53                                                TALLOC_CTX *mem_ctx,
54                                                TDB_DATA key,
55                                                bool persistent);
56
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 {
59         NTSTATUS status;
60         enum TDB_ERROR tret = tdb_error(tdb);
61
62         switch (tret) {
63         case TDB_ERR_EXISTS:
64                 status = NT_STATUS_OBJECT_NAME_COLLISION;
65                 break;
66         case TDB_ERR_NOEXIST:
67                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68                 break;
69         default:
70                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71                 break;
72         }
73
74         return status;
75 }
76
77
78 /**
79  * fetch a record from the tdb, separating out the header
80  * information and returning the body of the record.
81  */
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83                                    TDB_DATA key,
84                                    struct ctdb_ltdb_header *header,
85                                    TALLOC_CTX *mem_ctx,
86                                    TDB_DATA *data)
87 {
88         TDB_DATA rec;
89         NTSTATUS status;
90
91         rec = tdb_fetch(db->wtdb->tdb, key);
92         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93                 status = NT_STATUS_NOT_FOUND;
94                 if (data) {
95                         ZERO_STRUCTP(data);
96                 }
97                 if (header) {
98                         header->dmaster = (uint32_t)-1;
99                         header->rsn = 0;
100                 }
101                 goto done;
102         }
103
104         if (header) {
105                 *header = *(struct ctdb_ltdb_header *)rec.dptr;
106         }
107
108         if (data) {
109                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110                 if (data->dsize == 0) {
111                         data->dptr = NULL;
112                 } else {
113                         data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114                                         rec.dptr
115                                          + sizeof(struct ctdb_ltdb_header),
116                                         data->dsize);
117                         if (data->dptr == NULL) {
118                                 status = NT_STATUS_NO_MEMORY;
119                                 goto done;
120                         }
121                 }
122         }
123
124         status = NT_STATUS_OK;
125
126 done:
127         SAFE_FREE(rec.dptr);
128         return status;
129 }
130
131 /*
132  * Store a record together with the ctdb record header
133  * in the local copy of the database.
134  */
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136                                    TDB_DATA key,
137                                    struct ctdb_ltdb_header *header,
138                                    TDB_DATA data)
139 {
140         TALLOC_CTX *tmp_ctx = talloc_stackframe();
141         TDB_DATA rec;
142         int ret;
143
144         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
146
147         if (rec.dptr == NULL) {
148                 talloc_free(tmp_ctx);
149                 return NT_STATUS_NO_MEMORY;
150         }
151
152         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
154
155         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
156
157         talloc_free(tmp_ctx);
158
159         return (ret == 0) ? NT_STATUS_OK
160                           : tdb_error_to_ntstatus(db->wtdb->tdb);
161
162 }
163
164 /*
165   form a ctdb_rec_data record from a key/data pair
166
167   note that header may be NULL. If not NULL then it is included in the data portion
168   of the record
169  */
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
171                                                   TDB_DATA key, 
172                                                   struct ctdb_ltdb_header *header,
173                                                   TDB_DATA data)
174 {
175         size_t length;
176         struct ctdb_rec_data *d;
177
178         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
179                 data.dsize + (header?sizeof(*header):0);
180         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181         if (d == NULL) {
182                 return NULL;
183         }
184         d->length = length;
185         d->reqid = reqid;
186         d->keylen = key.dsize;
187         memcpy(&d->data[0], key.dptr, key.dsize);
188         if (header) {
189                 d->datalen = data.dsize + sizeof(*header);
190                 memcpy(&d->data[key.dsize], header, sizeof(*header));
191                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192         } else {
193                 d->datalen = data.dsize;
194                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
195         }
196         return d;
197 }
198
199
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
202                                                struct ctdb_marshall_buffer *m,
203                                                uint64_t db_id,
204                                                uint32_t reqid,
205                                                TDB_DATA key,
206                                                struct ctdb_ltdb_header *header,
207                                                TDB_DATA data)
208 {
209         struct ctdb_rec_data *r;
210         size_t m_size, r_size;
211         struct ctdb_marshall_buffer *m2 = NULL;
212
213         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214         if (r == NULL) {
215                 talloc_free(m);
216                 return NULL;
217         }
218
219         if (m == NULL) {
220                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222                 if (m == NULL) {
223                         goto done;
224                 }
225                 m->db_id = db_id;
226         }
227
228         m_size = talloc_get_size(m);
229         r_size = talloc_get_size(r);
230
231         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232                 mem_ctx, m,  m_size + r_size);
233         if (m2 == NULL) {
234                 talloc_free(m);
235                 goto done;
236         }
237
238         memcpy(m_size + (uint8_t *)m2, r, r_size);
239
240         m2->count++;
241
242 done:
243         talloc_free(r);
244         return m2;
245 }
246
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
249 {
250         TDB_DATA data;
251         data.dptr = (uint8_t *)m;
252         data.dsize = talloc_get_size(m);
253         return data;
254 }
255
256 /* 
257    loop over a marshalling buffer 
258
259      - pass r==NULL to start
260      - loop the number of times indicated by m->count
261 */
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263                                                      uint32_t *reqid,
264                                                      struct ctdb_ltdb_header *header,
265                                                      TDB_DATA *key, TDB_DATA *data)
266 {
267         if (r == NULL) {
268                 r = (struct ctdb_rec_data *)&m->data[0];
269         } else {
270                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
271         }
272
273         if (reqid != NULL) {
274                 *reqid = r->reqid;
275         }
276
277         if (key != NULL) {
278                 key->dptr   = &r->data[0];
279                 key->dsize  = r->keylen;
280         }
281         if (data != NULL) {
282                 data->dptr  = &r->data[r->keylen];
283                 data->dsize = r->datalen;
284                 if (header != NULL) {
285                         data->dptr += sizeof(*header);
286                         data->dsize -= sizeof(*header);
287                 }
288         }
289
290         if (header != NULL) {
291                 if (r->datalen < sizeof(*header)) {
292                         return NULL;
293                 }
294                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
295         }
296
297         return r;
298 }
299
300
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
302 {
303         int32_t status;
304         NTSTATUS ret;
305         TDB_DATA indata;
306
307         indata.dptr = (uint8_t *)&db_id;
308         indata.dsize = sizeof(db_id);
309
310         ret = ctdbd_control_local(messaging_ctdbd_connection(),
311                                   CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312                                   indata, NULL, NULL, &status);
313
314         if (!NT_STATUS_IS_OK(ret)) {
315                 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316                 return -1;
317         }
318
319         return status;
320 }
321
322
323 /**
324  * CTDB transaction destructor
325  */
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
327 {
328         tdb_transaction_cancel(h->ctx->wtdb->tdb);
329         return 0;
330 }
331
332 /**
333  * start a transaction on a ctdb database:
334  * - lock the transaction lock key
335  * - start the tdb transaction
336  */
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
338 {
339         struct db_record *rh;
340         struct db_ctdb_rec *crec;
341         TDB_DATA key;
342         TALLOC_CTX *tmp_ctx;
343         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344         int ret;
345         struct db_ctdb_ctx *ctx = h->ctx;
346         TDB_DATA data;
347         pid_t pid;
348         NTSTATUS status;
349         struct ctdb_ltdb_header header;
350         int32_t transaction_status;
351
352         key.dptr = (uint8_t *)discard_const(keyname);
353         key.dsize = strlen(keyname);
354
355 again:
356         tmp_ctx = talloc_new(h);
357
358         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
359         if (rh == NULL) {
360                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
361                 talloc_free(tmp_ctx);
362                 return -1;
363         }
364         crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
365
366         transaction_status = db_ctdb_transaction_active(ctx->db_id);
367         if (transaction_status == 1) {
368                 unsigned long int usec = (1000 + random()) % 100000;
369                 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370                           "Re-trying after %lu microseconds...",
371                           ctx->db_id, usec));
372                 talloc_free(tmp_ctx);
373                 usleep(usec);
374                 goto again;
375         }
376
377         /*
378          * store the pid in the database:
379          * it is not enought that the node is dmaster...
380          */
381         pid = getpid();
382         data.dptr = (unsigned char *)&pid;
383         data.dsize = sizeof(pid_t);
384         status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
385         if (!NT_STATUS_IS_OK(status)) {
386                 DEBUG(0, (__location__ " Failed to store pid in transaction "
387                           "record: %s\n", nt_errstr(status)));
388                 talloc_free(tmp_ctx);
389                 return -1;
390         }
391
392         talloc_free(rh);
393
394         ret = tdb_transaction_start(ctx->wtdb->tdb);
395         if (ret != 0) {
396                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
397                 talloc_free(tmp_ctx);
398                 return -1;
399         }
400
401         status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
402         if (!NT_STATUS_IS_OK(status)) {
403                 DEBUG(0, (__location__ " failed to refetch transaction lock "
404                           "record inside transaction: %s - retrying\n",
405                           nt_errstr(status)));
406                 tdb_transaction_cancel(ctx->wtdb->tdb);
407                 talloc_free(tmp_ctx);
408                 goto again;
409         }
410
411         if (header.dmaster != get_my_vnn()) {
412                 DEBUG(3, (__location__ " refetch transaction lock record : "
413                           "we are not dmaster any more "
414                           "(dmaster[%u] != my_vnn[%u]) - retrying\n",
415                           header.dmaster, get_my_vnn()));
416                 tdb_transaction_cancel(ctx->wtdb->tdb);
417                 talloc_free(tmp_ctx);
418                 goto again;
419         }
420
421         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
422                 tdb_transaction_cancel(ctx->wtdb->tdb);
423                 talloc_free(tmp_ctx);
424                 goto again;
425         }
426
427         talloc_free(tmp_ctx);
428
429         return 0;
430 }
431
432
433 /**
434  * CTDB dbwrap API: transaction_start function
435  * starts a transaction on a persistent database
436  */
437 static int db_ctdb_transaction_start(struct db_context *db)
438 {
439         struct db_ctdb_transaction_handle *h;
440         int ret;
441         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
442                                                         struct db_ctdb_ctx);
443
444         if (!db->persistent) {
445                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
446                          ctx->db_id));
447                 return -1;
448         }
449
450         if (ctx->transaction) {
451                 ctx->transaction->nesting++;
452                 return 0;
453         }
454
455         h = talloc_zero(db, struct db_ctdb_transaction_handle);
456         if (h == NULL) {
457                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
458                 return -1;
459         }
460
461         h->ctx = ctx;
462
463         ret = db_ctdb_transaction_fetch_start(h);
464         if (ret != 0) {
465                 talloc_free(h);
466                 return -1;
467         }
468
469         talloc_set_destructor(h, db_ctdb_transaction_destructor);
470
471         ctx->transaction = h;
472
473         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
474
475         return 0;
476 }
477
478
479
480 /*
481   fetch a record inside a transaction
482  */
483 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
484                                      TALLOC_CTX *mem_ctx, 
485                                      TDB_DATA key, TDB_DATA *data)
486 {
487         struct db_ctdb_transaction_handle *h = db->transaction;
488         NTSTATUS status;
489
490         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
491
492         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
493                 *data = tdb_null;
494         } else if (!NT_STATUS_IS_OK(status)) {
495                 return -1;
496         }
497
498         if (!h->in_replay) {
499                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
500                 if (h->m_all == NULL) {
501                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
502                         data->dsize = 0;
503                         talloc_free(data->dptr);
504                         return -1;
505                 }
506         }
507
508         return 0;
509 }
510
511
512 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
513 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
514
515 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
516                                                           TALLOC_CTX *mem_ctx,
517                                                           TDB_DATA key)
518 {
519         struct db_record *result;
520         TDB_DATA ctdb_data;
521
522         if (!(result = talloc(mem_ctx, struct db_record))) {
523                 DEBUG(0, ("talloc failed\n"));
524                 return NULL;
525         }
526
527         result->private_data = ctx->transaction;
528
529         result->key.dsize = key.dsize;
530         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
531         if (result->key.dptr == NULL) {
532                 DEBUG(0, ("talloc failed\n"));
533                 TALLOC_FREE(result);
534                 return NULL;
535         }
536
537         result->store = db_ctdb_store_transaction;
538         result->delete_rec = db_ctdb_delete_transaction;
539
540         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
541         if (ctdb_data.dptr == NULL) {
542                 /* create the record */
543                 result->value = tdb_null;
544                 return result;
545         }
546
547         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
548         result->value.dptr = NULL;
549
550         if ((result->value.dsize != 0)
551             && !(result->value.dptr = (uint8 *)talloc_memdup(
552                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
553                          result->value.dsize))) {
554                 DEBUG(0, ("talloc failed\n"));
555                 TALLOC_FREE(result);
556         }
557
558         SAFE_FREE(ctdb_data.dptr);
559
560         return result;
561 }
562
563 static int db_ctdb_record_destructor(struct db_record **recp)
564 {
565         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
566         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
567                 rec->private_data, struct db_ctdb_transaction_handle);
568         int ret = h->ctx->db->transaction_commit(h->ctx->db);
569         if (ret != 0) {
570                 DEBUG(0,(__location__ " transaction_commit failed\n"));
571         }
572         return 0;
573 }
574
575 /*
576   auto-create a transaction for persistent databases
577  */
578 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
579                                                          TALLOC_CTX *mem_ctx,
580                                                          TDB_DATA key)
581 {
582         int res;
583         struct db_record *rec, **recp;
584
585         res = db_ctdb_transaction_start(ctx->db);
586         if (res == -1) {
587                 return NULL;
588         }
589
590         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
591         if (rec == NULL) {
592                 ctx->db->transaction_cancel(ctx->db);           
593                 return NULL;
594         }
595
596         /* destroy this transaction when we release the lock */
597         recp = talloc(rec, struct db_record *);
598         if (recp == NULL) {
599                 ctx->db->transaction_cancel(ctx->db);
600                 talloc_free(rec);
601                 return NULL;
602         }
603         *recp = rec;
604         talloc_set_destructor(recp, db_ctdb_record_destructor);
605         return rec;
606 }
607
608
609 /*
610   stores a record inside a transaction
611  */
612 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
613                                      TDB_DATA key, TDB_DATA data)
614 {
615         TALLOC_CTX *tmp_ctx = talloc_new(h);
616         int ret;
617         TDB_DATA rec;
618         struct ctdb_ltdb_header header;
619         NTSTATUS status;
620
621         /* we need the header so we can update the RSN */
622         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
623         if (rec.dptr == NULL) {
624                 /* the record doesn't exist - create one with us as dmaster.
625                    This is only safe because we are in a transaction and this
626                    is a persistent database */
627                 ZERO_STRUCT(header);
628         } else {
629                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
630                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
631                 /* a special case, we are writing the same data that is there now */
632                 if (data.dsize == rec.dsize &&
633                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
634                         SAFE_FREE(rec.dptr);
635                         talloc_free(tmp_ctx);
636                         return 0;
637                 }
638                 SAFE_FREE(rec.dptr);
639         }
640
641         header.dmaster = get_my_vnn();
642         header.rsn++;
643
644         if (!h->in_replay) {
645                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
646                 if (h->m_all == NULL) {
647                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
648                         talloc_free(tmp_ctx);
649                         return -1;
650                 }
651         }
652
653         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
654         if (h->m_write == NULL) {
655                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
656                 talloc_free(tmp_ctx);
657                 return -1;
658         }
659
660         status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
661         if (NT_STATUS_IS_OK(status)) {
662                 ret = 0;
663         } else {
664                 ret = -1;
665         }
666
667         talloc_free(tmp_ctx);
668
669         return ret;
670 }
671
672
673 /* 
674    a record store inside a transaction
675  */
676 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
677 {
678         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
679                 rec->private_data, struct db_ctdb_transaction_handle);
680         int ret;
681
682         ret = db_ctdb_transaction_store(h, rec->key, data);
683         if (ret != 0) {
684                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
685         }
686         return NT_STATUS_OK;
687 }
688
689 /* 
690    a record delete inside a transaction
691  */
692 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
693 {
694         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
695                 rec->private_data, struct db_ctdb_transaction_handle);
696         int ret;
697
698         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
699         if (ret != 0) {
700                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
701         }
702         return NT_STATUS_OK;
703 }
704
705
706 /*
707   replay a transaction
708  */
709 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
710 {
711         int ret, i;
712         struct ctdb_rec_data *rec = NULL;
713
714         h->in_replay = true;
715         talloc_free(h->m_write);
716         h->m_write = NULL;
717
718         ret = db_ctdb_transaction_fetch_start(h);
719         if (ret != 0) {
720                 return ret;
721         }
722
723         for (i=0;i<h->m_all->count;i++) {
724                 TDB_DATA key, data;
725
726                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
727                 if (rec == NULL) {
728                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
729                         goto failed;
730                 }
731
732                 if (rec->reqid == 0) {
733                         /* its a store */
734                         if (db_ctdb_transaction_store(h, key, data) != 0) {
735                                 goto failed;
736                         }
737                 } else {
738                         TDB_DATA data2;
739                         TALLOC_CTX *tmp_ctx = talloc_new(h);
740
741                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
742                                 talloc_free(tmp_ctx);
743                                 goto failed;
744                         }
745                         if (data2.dsize != data.dsize ||
746                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
747                                 /* the record has changed on us - we have to give up */
748                                 talloc_free(tmp_ctx);
749                                 goto failed;
750                         }
751                         talloc_free(tmp_ctx);
752                 }
753         }
754
755         return 0;
756
757 failed:
758         tdb_transaction_cancel(h->ctx->wtdb->tdb);
759         return -1;
760 }
761
762
763 /*
764   commit a transaction
765  */
766 static int db_ctdb_transaction_commit(struct db_context *db)
767 {
768         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
769                                                         struct db_ctdb_ctx);
770         NTSTATUS rets;
771         int ret;
772         int status;
773         int retries = 0;
774         struct db_ctdb_transaction_handle *h = ctx->transaction;
775         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
776
777         if (h == NULL) {
778                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
779                 return -1;
780         }
781
782         if (h->nested_cancel) {
783                 db->transaction_cancel(db);
784                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
785                 return -1;
786         }
787
788         if (h->nesting != 0) {
789                 h->nesting--;
790                 return 0;
791         }
792
793         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
794
795         talloc_set_destructor(h, NULL);
796
797         /* our commit strategy is quite complex.
798
799            - we first try to commit the changes to all other nodes
800
801            - if that works, then we commit locally and we are done
802
803            - if a commit on another node fails, then we need to cancel
804              the transaction, then restart the transaction (thus
805              opening a window of time for a pending recovery to
806              complete), then replay the transaction, checking all the
807              reads and writes (checking that reads give the same data,
808              and writes succeed). Then we retry the transaction to the
809              other nodes
810         */
811
812 again:
813         if (h->m_write == NULL) {
814                 /* no changes were made, potentially after a retry */
815                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
816                 talloc_free(h);
817                 ctx->transaction = NULL;
818                 return 0;
819         }
820
821         /* tell ctdbd to commit to the other nodes */
822         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
823                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
824                                    h->ctx->db_id, 0,
825                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
826         if (!NT_STATUS_IS_OK(rets) || status != 0) {
827                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
828                 sleep(1);
829
830                 if (!NT_STATUS_IS_OK(rets)) {
831                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
832                 } else {
833                         /* work out what error code we will give if we 
834                            have to fail the operation */
835                         switch ((enum ctdb_trans2_commit_error)status) {
836                         case CTDB_TRANS2_COMMIT_SUCCESS:
837                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
838                         case CTDB_TRANS2_COMMIT_TIMEOUT:
839                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
840                                 break;
841                         case CTDB_TRANS2_COMMIT_ALLFAIL:
842                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
843                                 break;
844                         }
845                 }
846
847                 if (++retries == 5) {
848                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
849                                  h->ctx->db_id, retries, (unsigned)failure_control));
850                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
851                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
852                                             tdb_null, NULL, NULL, NULL);
853                         h->ctx->transaction = NULL;
854                         talloc_free(h);
855                         ctx->transaction = NULL;
856                         return -1;                      
857                 }
858
859                 if (ctdb_replay_transaction(h) != 0) {
860                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
861                                  (unsigned)failure_control));
862                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
863                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
864                                             tdb_null, NULL, NULL, NULL);
865                         h->ctx->transaction = NULL;
866                         talloc_free(h);
867                         ctx->transaction = NULL;
868                         return -1;
869                 }
870                 goto again;
871         } else {
872                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
873         }
874
875         /* do the real commit locally */
876         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
877         if (ret != 0) {
878                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
879                          (unsigned)failure_control));
880                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
881                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
882                 h->ctx->transaction = NULL;
883                 talloc_free(h);
884                 return ret;
885         }
886
887         /* tell ctdbd that we are finished with our local commit */
888         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
889                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
890                             tdb_null, NULL, NULL, NULL);
891         h->ctx->transaction = NULL;
892         talloc_free(h);
893         return 0;
894 }
895
896
897 /*
898   cancel a transaction
899  */
900 static int db_ctdb_transaction_cancel(struct db_context *db)
901 {
902         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
903                                                         struct db_ctdb_ctx);
904         struct db_ctdb_transaction_handle *h = ctx->transaction;
905
906         if (h == NULL) {
907                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
908                 return -1;
909         }
910
911         if (h->nesting != 0) {
912                 h->nesting--;
913                 h->nested_cancel = true;
914                 return 0;
915         }
916
917         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
918
919         ctx->transaction = NULL;
920         talloc_free(h);
921         return 0;
922 }
923
924
925 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
926 {
927         struct db_ctdb_rec *crec = talloc_get_type_abort(
928                 rec->private_data, struct db_ctdb_rec);
929
930         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
931 }
932
933
934
935 static NTSTATUS db_ctdb_delete(struct db_record *rec)
936 {
937         TDB_DATA data;
938
939         /*
940          * We have to store the header with empty data. TODO: Fix the
941          * tdb-level cleanup
942          */
943
944         ZERO_STRUCT(data);
945
946         return db_ctdb_store(rec, data, 0);
947
948 }
949
950 static int db_ctdb_record_destr(struct db_record* data)
951 {
952         struct db_ctdb_rec *crec = talloc_get_type_abort(
953                 data->private_data, struct db_ctdb_rec);
954
955         DEBUG(10, (DEBUGLEVEL > 10
956                    ? "Unlocking db %u key %s\n"
957                    : "Unlocking db %u key %.20s\n",
958                    (int)crec->ctdb_ctx->db_id,
959                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
960                               data->key.dsize)));
961
962         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
963                 DEBUG(0, ("tdb_chainunlock failed\n"));
964                 return -1;
965         }
966
967         return 0;
968 }
969
970 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
971                                                TALLOC_CTX *mem_ctx,
972                                                TDB_DATA key,
973                                                bool persistent)
974 {
975         struct db_record *result;
976         struct db_ctdb_rec *crec;
977         NTSTATUS status;
978         TDB_DATA ctdb_data;
979         int migrate_attempts = 0;
980
981         if (!(result = talloc(mem_ctx, struct db_record))) {
982                 DEBUG(0, ("talloc failed\n"));
983                 return NULL;
984         }
985
986         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
987                 DEBUG(0, ("talloc failed\n"));
988                 TALLOC_FREE(result);
989                 return NULL;
990         }
991
992         result->private_data = (void *)crec;
993         crec->ctdb_ctx = ctx;
994
995         result->key.dsize = key.dsize;
996         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
997         if (result->key.dptr == NULL) {
998                 DEBUG(0, ("talloc failed\n"));
999                 TALLOC_FREE(result);
1000                 return NULL;
1001         }
1002
1003         /*
1004          * Do a blocking lock on the record
1005          */
1006 again:
1007
1008         if (DEBUGLEVEL >= 10) {
1009                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1010                 DEBUG(10, (DEBUGLEVEL > 10
1011                            ? "Locking db %u key %s\n"
1012                            : "Locking db %u key %.20s\n",
1013                            (int)crec->ctdb_ctx->db_id, keystr));
1014                 TALLOC_FREE(keystr);
1015         }
1016
1017         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1018                 DEBUG(3, ("tdb_chainlock failed\n"));
1019                 TALLOC_FREE(result);
1020                 return NULL;
1021         }
1022
1023         result->store = db_ctdb_store;
1024         result->delete_rec = db_ctdb_delete;
1025         talloc_set_destructor(result, db_ctdb_record_destr);
1026
1027         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1028
1029         /*
1030          * See if we have a valid record and we are the dmaster. If so, we can
1031          * take the shortcut and just return it.
1032          */
1033
1034         if ((ctdb_data.dptr == NULL) ||
1035             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1036             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1037 #if 0
1038             || (random() % 2 != 0)
1039 #endif
1040 ) {
1041                 SAFE_FREE(ctdb_data.dptr);
1042                 tdb_chainunlock(ctx->wtdb->tdb, key);
1043                 talloc_set_destructor(result, NULL);
1044
1045                 migrate_attempts += 1;
1046
1047                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1048                            ctdb_data.dptr, ctdb_data.dptr ?
1049                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1050                            get_my_vnn()));
1051
1052                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1053                 if (!NT_STATUS_IS_OK(status)) {
1054                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1055                                   nt_errstr(status)));
1056                         TALLOC_FREE(result);
1057                         return NULL;
1058                 }
1059                 /* now its migrated, try again */
1060                 goto again;
1061         }
1062
1063         if (migrate_attempts > 10) {
1064                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1065                           migrate_attempts));
1066         }
1067
1068         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1069
1070         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1071         result->value.dptr = NULL;
1072
1073         if ((result->value.dsize != 0)
1074             && !(result->value.dptr = (uint8 *)talloc_memdup(
1075                          result, ctdb_data.dptr + sizeof(crec->header),
1076                          result->value.dsize))) {
1077                 DEBUG(0, ("talloc failed\n"));
1078                 TALLOC_FREE(result);
1079         }
1080
1081         SAFE_FREE(ctdb_data.dptr);
1082
1083         return result;
1084 }
1085
1086 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1087                                               TALLOC_CTX *mem_ctx,
1088                                               TDB_DATA key)
1089 {
1090         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1091                                                         struct db_ctdb_ctx);
1092
1093         if (ctx->transaction != NULL) {
1094                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1095         }
1096
1097         if (db->persistent) {
1098                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1099         }
1100
1101         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1102 }
1103
1104 /*
1105   fetch (unlocked, no migration) operation on ctdb
1106  */
1107 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1108                          TDB_DATA key, TDB_DATA *data)
1109 {
1110         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1111                                                         struct db_ctdb_ctx);
1112         NTSTATUS status;
1113         TDB_DATA ctdb_data;
1114
1115         if (ctx->transaction) {
1116                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1117         }
1118
1119         /* try a direct fetch */
1120         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1121
1122         /*
1123          * See if we have a valid record and we are the dmaster. If so, we can
1124          * take the shortcut and just return it.
1125          * we bypass the dmaster check for persistent databases
1126          */
1127         if ((ctdb_data.dptr != NULL) &&
1128             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1129             (db->persistent ||
1130              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1131                 /* we are the dmaster - avoid the ctdb protocol op */
1132
1133                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1134                 if (data->dsize == 0) {
1135                         SAFE_FREE(ctdb_data.dptr);
1136                         data->dptr = NULL;
1137                         return 0;
1138                 }
1139
1140                 data->dptr = (uint8 *)talloc_memdup(
1141                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1142                         data->dsize);
1143
1144                 SAFE_FREE(ctdb_data.dptr);
1145
1146                 if (data->dptr == NULL) {
1147                         return -1;
1148                 }
1149                 return 0;
1150         }
1151
1152         SAFE_FREE(ctdb_data.dptr);
1153
1154         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1155         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1156         if (!NT_STATUS_IS_OK(status)) {
1157                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1158                 return -1;
1159         }
1160
1161         return 0;
1162 }
1163
1164 struct traverse_state {
1165         struct db_context *db;
1166         int (*fn)(struct db_record *rec, void *private_data);
1167         void *private_data;
1168 };
1169
1170 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1171 {
1172         struct traverse_state *state = (struct traverse_state *)private_data;
1173         struct db_record *rec;
1174         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1175         /* we have to give them a locked record to prevent races */
1176         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1177         if (rec && rec->value.dsize > 0) {
1178                 state->fn(rec, state->private_data);
1179         }
1180         talloc_free(tmp_ctx);
1181 }
1182
1183 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1184                                         void *private_data)
1185 {
1186         struct traverse_state *state = (struct traverse_state *)private_data;
1187         struct db_record *rec;
1188         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1189         int ret = 0;
1190         /* we have to give them a locked record to prevent races */
1191         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1192         if (rec && rec->value.dsize > 0) {
1193                 ret = state->fn(rec, state->private_data);
1194         }
1195         talloc_free(tmp_ctx);
1196         return ret;
1197 }
1198
1199 static int db_ctdb_traverse(struct db_context *db,
1200                             int (*fn)(struct db_record *rec,
1201                                       void *private_data),
1202                             void *private_data)
1203 {
1204         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1205                                                         struct db_ctdb_ctx);
1206         struct traverse_state state;
1207
1208         state.db = db;
1209         state.fn = fn;
1210         state.private_data = private_data;
1211
1212         if (db->persistent) {
1213                 /* for persistent databases we don't need to do a ctdb traverse,
1214                    we can do a faster local traverse */
1215                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1216         }
1217
1218
1219         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1220         return 0;
1221 }
1222
1223 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1224 {
1225         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1226 }
1227
1228 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1229 {
1230         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1231 }
1232
1233 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1234 {
1235         struct traverse_state *state = (struct traverse_state *)private_data;
1236         struct db_record rec;
1237         rec.key = key;
1238         rec.value = data;
1239         rec.store = db_ctdb_store_deny;
1240         rec.delete_rec = db_ctdb_delete_deny;
1241         rec.private_data = state->db;
1242         state->fn(&rec, state->private_data);
1243 }
1244
1245 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1246                                         void *private_data)
1247 {
1248         struct traverse_state *state = (struct traverse_state *)private_data;
1249         struct db_record rec;
1250         rec.key = kbuf;
1251         rec.value = dbuf;
1252         rec.store = db_ctdb_store_deny;
1253         rec.delete_rec = db_ctdb_delete_deny;
1254         rec.private_data = state->db;
1255
1256         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1257                 /* a deleted record */
1258                 return 0;
1259         }
1260         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1261         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1262
1263         return state->fn(&rec, state->private_data);
1264 }
1265
1266 static int db_ctdb_traverse_read(struct db_context *db,
1267                                  int (*fn)(struct db_record *rec,
1268                                            void *private_data),
1269                                  void *private_data)
1270 {
1271         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1272                                                         struct db_ctdb_ctx);
1273         struct traverse_state state;
1274
1275         state.db = db;
1276         state.fn = fn;
1277         state.private_data = private_data;
1278
1279         if (db->persistent) {
1280                 /* for persistent databases we don't need to do a ctdb traverse,
1281                    we can do a faster local traverse */
1282                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1283         }
1284
1285         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1286         return 0;
1287 }
1288
1289 static int db_ctdb_get_seqnum(struct db_context *db)
1290 {
1291         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1292                                                         struct db_ctdb_ctx);
1293         return tdb_get_seqnum(ctx->wtdb->tdb);
1294 }
1295
1296 static int db_ctdb_get_flags(struct db_context *db)
1297 {
1298         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1299                                                         struct db_ctdb_ctx);
1300         return tdb_get_flags(ctx->wtdb->tdb);
1301 }
1302
1303 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1304                                 const char *name,
1305                                 int hash_size, int tdb_flags,
1306                                 int open_flags, mode_t mode)
1307 {
1308         struct db_context *result;
1309         struct db_ctdb_ctx *db_ctdb;
1310         char *db_path;
1311
1312         if (!lp_clustering()) {
1313                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1314                 return NULL;
1315         }
1316
1317         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1318                 DEBUG(0, ("talloc failed\n"));
1319                 TALLOC_FREE(result);
1320                 return NULL;
1321         }
1322
1323         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1324                 DEBUG(0, ("talloc failed\n"));
1325                 TALLOC_FREE(result);
1326                 return NULL;
1327         }
1328
1329         db_ctdb->transaction = NULL;
1330         db_ctdb->db = result;
1331
1332         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1333                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1334                 TALLOC_FREE(result);
1335                 return NULL;
1336         }
1337
1338         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1339
1340         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1341
1342         /* only pass through specific flags */
1343         tdb_flags &= TDB_SEQNUM;
1344
1345         /* honor permissions if user has specified O_CREAT */
1346         if (open_flags & O_CREAT) {
1347                 chmod(db_path, mode);
1348         }
1349
1350         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1351         if (db_ctdb->wtdb == NULL) {
1352                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1353                 TALLOC_FREE(result);
1354                 return NULL;
1355         }
1356         talloc_free(db_path);
1357
1358         result->private_data = (void *)db_ctdb;
1359         result->fetch_locked = db_ctdb_fetch_locked;
1360         result->fetch = db_ctdb_fetch;
1361         result->traverse = db_ctdb_traverse;
1362         result->traverse_read = db_ctdb_traverse_read;
1363         result->get_seqnum = db_ctdb_get_seqnum;
1364         result->get_flags = db_ctdb_get_flags;
1365         result->transaction_start = db_ctdb_transaction_start;
1366         result->transaction_commit = db_ctdb_transaction_commit;
1367         result->transaction_cancel = db_ctdb_transaction_cancel;
1368
1369         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1370                  name, db_ctdb->db_id));
1371
1372         return result;
1373 }
1374 #endif