s3: Fix explicit stat64 support
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /*
30          * we store the reads and writes done under a transaction:
31          * - one list stores both reads and writes (m_all),
32          * - the other just writes (m_write)
33          */
34         struct ctdb_marshall_buffer *m_all;
35         struct ctdb_marshall_buffer *m_write;
36         uint32_t nesting;
37         bool nested_cancel;
38 };
39
40 struct db_ctdb_ctx {
41         struct db_context *db;
42         struct tdb_wrap *wtdb;
43         uint32 db_id;
44         struct db_ctdb_transaction_handle *transaction;
45 };
46
47 struct db_ctdb_rec {
48         struct db_ctdb_ctx *ctdb_ctx;
49         struct ctdb_ltdb_header header;
50 };
51
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53                                                TALLOC_CTX *mem_ctx,
54                                                TDB_DATA key,
55                                                bool persistent);
56
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 {
59         NTSTATUS status;
60         enum TDB_ERROR tret = tdb_error(tdb);
61
62         switch (tret) {
63         case TDB_ERR_EXISTS:
64                 status = NT_STATUS_OBJECT_NAME_COLLISION;
65                 break;
66         case TDB_ERR_NOEXIST:
67                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68                 break;
69         default:
70                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71                 break;
72         }
73
74         return status;
75 }
76
77
78 /**
79  * fetch a record from the tdb, separating out the header
80  * information and returning the body of the record.
81  */
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83                                    TDB_DATA key,
84                                    struct ctdb_ltdb_header *header,
85                                    TALLOC_CTX *mem_ctx,
86                                    TDB_DATA *data)
87 {
88         TDB_DATA rec;
89         NTSTATUS status;
90
91         rec = tdb_fetch(db->wtdb->tdb, key);
92         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93                 status = NT_STATUS_NOT_FOUND;
94                 if (data) {
95                         ZERO_STRUCTP(data);
96                 }
97                 if (header) {
98                         header->dmaster = (uint32_t)-1;
99                         header->rsn = 0;
100                 }
101                 goto done;
102         }
103
104         if (header) {
105                 *header = *(struct ctdb_ltdb_header *)rec.dptr;
106         }
107
108         if (data) {
109                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110                 if (data->dsize == 0) {
111                         data->dptr = NULL;
112                 } else {
113                         data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114                                         rec.dptr
115                                          + sizeof(struct ctdb_ltdb_header),
116                                         data->dsize);
117                         if (data->dptr == NULL) {
118                                 status = NT_STATUS_NO_MEMORY;
119                                 goto done;
120                         }
121                 }
122         }
123
124         status = NT_STATUS_OK;
125
126 done:
127         SAFE_FREE(rec.dptr);
128         return status;
129 }
130
131 /*
132  * Store a record together with the ctdb record header
133  * in the local copy of the database.
134  */
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136                                    TDB_DATA key,
137                                    struct ctdb_ltdb_header *header,
138                                    TDB_DATA data)
139 {
140         TALLOC_CTX *tmp_ctx = talloc_stackframe();
141         TDB_DATA rec;
142         int ret;
143
144         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
146
147         if (rec.dptr == NULL) {
148                 talloc_free(tmp_ctx);
149                 return NT_STATUS_NO_MEMORY;
150         }
151
152         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
154
155         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
156
157         talloc_free(tmp_ctx);
158
159         return (ret == 0) ? NT_STATUS_OK
160                           : tdb_error_to_ntstatus(db->wtdb->tdb);
161
162 }
163
164 /*
165   form a ctdb_rec_data record from a key/data pair
166
167   note that header may be NULL. If not NULL then it is included in the data portion
168   of the record
169  */
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
171                                                   TDB_DATA key, 
172                                                   struct ctdb_ltdb_header *header,
173                                                   TDB_DATA data)
174 {
175         size_t length;
176         struct ctdb_rec_data *d;
177
178         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
179                 data.dsize + (header?sizeof(*header):0);
180         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181         if (d == NULL) {
182                 return NULL;
183         }
184         d->length = length;
185         d->reqid = reqid;
186         d->keylen = key.dsize;
187         memcpy(&d->data[0], key.dptr, key.dsize);
188         if (header) {
189                 d->datalen = data.dsize + sizeof(*header);
190                 memcpy(&d->data[key.dsize], header, sizeof(*header));
191                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192         } else {
193                 d->datalen = data.dsize;
194                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
195         }
196         return d;
197 }
198
199
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
202                                                struct ctdb_marshall_buffer *m,
203                                                uint64_t db_id,
204                                                uint32_t reqid,
205                                                TDB_DATA key,
206                                                struct ctdb_ltdb_header *header,
207                                                TDB_DATA data)
208 {
209         struct ctdb_rec_data *r;
210         size_t m_size, r_size;
211         struct ctdb_marshall_buffer *m2 = NULL;
212
213         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214         if (r == NULL) {
215                 talloc_free(m);
216                 return NULL;
217         }
218
219         if (m == NULL) {
220                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222                 if (m == NULL) {
223                         goto done;
224                 }
225                 m->db_id = db_id;
226         }
227
228         m_size = talloc_get_size(m);
229         r_size = talloc_get_size(r);
230
231         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232                 mem_ctx, m,  m_size + r_size);
233         if (m2 == NULL) {
234                 talloc_free(m);
235                 goto done;
236         }
237
238         memcpy(m_size + (uint8_t *)m2, r, r_size);
239
240         m2->count++;
241
242 done:
243         talloc_free(r);
244         return m2;
245 }
246
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
249 {
250         TDB_DATA data;
251         data.dptr = (uint8_t *)m;
252         data.dsize = talloc_get_size(m);
253         return data;
254 }
255
256 /* 
257    loop over a marshalling buffer 
258
259      - pass r==NULL to start
260      - loop the number of times indicated by m->count
261 */
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263                                                      uint32_t *reqid,
264                                                      struct ctdb_ltdb_header *header,
265                                                      TDB_DATA *key, TDB_DATA *data)
266 {
267         if (r == NULL) {
268                 r = (struct ctdb_rec_data *)&m->data[0];
269         } else {
270                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
271         }
272
273         if (reqid != NULL) {
274                 *reqid = r->reqid;
275         }
276
277         if (key != NULL) {
278                 key->dptr   = &r->data[0];
279                 key->dsize  = r->keylen;
280         }
281         if (data != NULL) {
282                 data->dptr  = &r->data[r->keylen];
283                 data->dsize = r->datalen;
284                 if (header != NULL) {
285                         data->dptr += sizeof(*header);
286                         data->dsize -= sizeof(*header);
287                 }
288         }
289
290         if (header != NULL) {
291                 if (r->datalen < sizeof(*header)) {
292                         return NULL;
293                 }
294                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
295         }
296
297         return r;
298 }
299
300
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
302 {
303         int32_t status;
304         NTSTATUS ret;
305         TDB_DATA indata;
306
307         indata.dptr = (uint8_t *)&db_id;
308         indata.dsize = sizeof(db_id);
309
310         ret = ctdbd_control_local(messaging_ctdbd_connection(),
311                                   CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312                                   indata, NULL, NULL, &status);
313
314         if (!NT_STATUS_IS_OK(ret)) {
315                 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316                 return -1;
317         }
318
319         return status;
320 }
321
322
323 /**
324  * CTDB transaction destructor
325  */
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
327 {
328         tdb_transaction_cancel(h->ctx->wtdb->tdb);
329         return 0;
330 }
331
332 /**
333  * start a transaction on a ctdb database:
334  * - lock the transaction lock key
335  * - start the tdb transaction
336  */
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
338 {
339         struct db_record *rh;
340         struct db_ctdb_rec *crec;
341         TDB_DATA key;
342         TALLOC_CTX *tmp_ctx;
343         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344         int ret;
345         struct db_ctdb_ctx *ctx = h->ctx;
346         TDB_DATA data;
347         pid_t pid;
348         NTSTATUS status;
349         struct ctdb_ltdb_header header;
350         int32_t transaction_status;
351
352         key.dptr = (uint8_t *)discard_const(keyname);
353         key.dsize = strlen(keyname);
354
355 again:
356         tmp_ctx = talloc_new(h);
357
358         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
359         if (rh == NULL) {
360                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
361                 talloc_free(tmp_ctx);
362                 return -1;
363         }
364         crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
365
366         transaction_status = db_ctdb_transaction_active(ctx->db_id);
367         if (transaction_status == 1) {
368                 unsigned long int usec = (1000 + random()) % 100000;
369                 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370                           "Re-trying after %lu microseconds...",
371                           ctx->db_id, usec));
372                 talloc_free(tmp_ctx);
373                 usleep(usec);
374                 goto again;
375         }
376
377         /*
378          * store the pid in the database:
379          * it is not enought that the node is dmaster...
380          */
381         pid = getpid();
382         data.dptr = (unsigned char *)&pid;
383         data.dsize = sizeof(pid_t);
384         status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
385         if (!NT_STATUS_IS_OK(status)) {
386                 DEBUG(0, (__location__ " Failed to store pid in transaction "
387                           "record: %s\n", nt_errstr(status)));
388                 talloc_free(tmp_ctx);
389                 return -1;
390         }
391
392         talloc_free(rh);
393
394         ret = tdb_transaction_start(ctx->wtdb->tdb);
395         if (ret != 0) {
396                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
397                 talloc_free(tmp_ctx);
398                 return -1;
399         }
400
401         status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
402         if (!NT_STATUS_IS_OK(status)) {
403                 DEBUG(0, (__location__ " failed to refetch transaction lock "
404                           "record inside transaction: %s - retrying\n",
405                           nt_errstr(status)));
406                 tdb_transaction_cancel(ctx->wtdb->tdb);
407                 talloc_free(tmp_ctx);
408                 goto again;
409         }
410
411         if (header.dmaster != get_my_vnn()) {
412                 DEBUG(3, (__location__ " refetch transaction lock record : "
413                           "we are not dmaster any more "
414                           "(dmaster[%u] != my_vnn[%u]) - retrying\n",
415                           header.dmaster, get_my_vnn()));
416                 tdb_transaction_cancel(ctx->wtdb->tdb);
417                 talloc_free(tmp_ctx);
418                 goto again;
419         }
420
421         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
422                 DEBUG(3, (__location__ " refetch transaction lock record: "
423                           "another local process has started a transaction "
424                           "(stored pid [%u] != my pid [%u]) - retrying\n",
425                           *(pid_t *)(data.dptr), pid));
426                 tdb_transaction_cancel(ctx->wtdb->tdb);
427                 talloc_free(tmp_ctx);
428                 goto again;
429         }
430
431         talloc_free(tmp_ctx);
432
433         return 0;
434 }
435
436
437 /**
438  * CTDB dbwrap API: transaction_start function
439  * starts a transaction on a persistent database
440  */
441 static int db_ctdb_transaction_start(struct db_context *db)
442 {
443         struct db_ctdb_transaction_handle *h;
444         int ret;
445         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
446                                                         struct db_ctdb_ctx);
447
448         if (!db->persistent) {
449                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
450                          ctx->db_id));
451                 return -1;
452         }
453
454         if (ctx->transaction) {
455                 ctx->transaction->nesting++;
456                 return 0;
457         }
458
459         h = talloc_zero(db, struct db_ctdb_transaction_handle);
460         if (h == NULL) {
461                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
462                 return -1;
463         }
464
465         h->ctx = ctx;
466
467         ret = db_ctdb_transaction_fetch_start(h);
468         if (ret != 0) {
469                 talloc_free(h);
470                 return -1;
471         }
472
473         talloc_set_destructor(h, db_ctdb_transaction_destructor);
474
475         ctx->transaction = h;
476
477         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
478
479         return 0;
480 }
481
482
483
484 /*
485   fetch a record inside a transaction
486  */
487 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
488                                      TALLOC_CTX *mem_ctx, 
489                                      TDB_DATA key, TDB_DATA *data)
490 {
491         struct db_ctdb_transaction_handle *h = db->transaction;
492         NTSTATUS status;
493
494         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
495
496         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
497                 *data = tdb_null;
498         } else if (!NT_STATUS_IS_OK(status)) {
499                 return -1;
500         }
501
502         if (!h->in_replay) {
503                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
504                 if (h->m_all == NULL) {
505                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
506                         data->dsize = 0;
507                         talloc_free(data->dptr);
508                         return -1;
509                 }
510         }
511
512         return 0;
513 }
514
515
516 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
517 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
518
519 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
520                                                           TALLOC_CTX *mem_ctx,
521                                                           TDB_DATA key)
522 {
523         struct db_record *result;
524         TDB_DATA ctdb_data;
525
526         if (!(result = talloc(mem_ctx, struct db_record))) {
527                 DEBUG(0, ("talloc failed\n"));
528                 return NULL;
529         }
530
531         result->private_data = ctx->transaction;
532
533         result->key.dsize = key.dsize;
534         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
535         if (result->key.dptr == NULL) {
536                 DEBUG(0, ("talloc failed\n"));
537                 TALLOC_FREE(result);
538                 return NULL;
539         }
540
541         result->store = db_ctdb_store_transaction;
542         result->delete_rec = db_ctdb_delete_transaction;
543
544         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
545         if (ctdb_data.dptr == NULL) {
546                 /* create the record */
547                 result->value = tdb_null;
548                 return result;
549         }
550
551         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
552         result->value.dptr = NULL;
553
554         if ((result->value.dsize != 0)
555             && !(result->value.dptr = (uint8 *)talloc_memdup(
556                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
557                          result->value.dsize))) {
558                 DEBUG(0, ("talloc failed\n"));
559                 TALLOC_FREE(result);
560         }
561
562         SAFE_FREE(ctdb_data.dptr);
563
564         return result;
565 }
566
567 static int db_ctdb_record_destructor(struct db_record **recp)
568 {
569         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
570         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
571                 rec->private_data, struct db_ctdb_transaction_handle);
572         int ret = h->ctx->db->transaction_commit(h->ctx->db);
573         if (ret != 0) {
574                 DEBUG(0,(__location__ " transaction_commit failed\n"));
575         }
576         return 0;
577 }
578
579 /*
580   auto-create a transaction for persistent databases
581  */
582 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
583                                                          TALLOC_CTX *mem_ctx,
584                                                          TDB_DATA key)
585 {
586         int res;
587         struct db_record *rec, **recp;
588
589         res = db_ctdb_transaction_start(ctx->db);
590         if (res == -1) {
591                 return NULL;
592         }
593
594         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
595         if (rec == NULL) {
596                 ctx->db->transaction_cancel(ctx->db);           
597                 return NULL;
598         }
599
600         /* destroy this transaction when we release the lock */
601         recp = talloc(rec, struct db_record *);
602         if (recp == NULL) {
603                 ctx->db->transaction_cancel(ctx->db);
604                 talloc_free(rec);
605                 return NULL;
606         }
607         *recp = rec;
608         talloc_set_destructor(recp, db_ctdb_record_destructor);
609         return rec;
610 }
611
612
613 /*
614   stores a record inside a transaction
615  */
616 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
617                                      TDB_DATA key, TDB_DATA data)
618 {
619         TALLOC_CTX *tmp_ctx = talloc_new(h);
620         int ret;
621         TDB_DATA rec;
622         struct ctdb_ltdb_header header;
623         NTSTATUS status;
624
625         /* we need the header so we can update the RSN */
626         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
627         if (rec.dptr == NULL) {
628                 /* the record doesn't exist - create one with us as dmaster.
629                    This is only safe because we are in a transaction and this
630                    is a persistent database */
631                 ZERO_STRUCT(header);
632         } else {
633                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
634                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
635                 /* a special case, we are writing the same data that is there now */
636                 if (data.dsize == rec.dsize &&
637                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
638                         SAFE_FREE(rec.dptr);
639                         talloc_free(tmp_ctx);
640                         return 0;
641                 }
642                 SAFE_FREE(rec.dptr);
643         }
644
645         header.dmaster = get_my_vnn();
646         header.rsn++;
647
648         if (!h->in_replay) {
649                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
650                 if (h->m_all == NULL) {
651                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
652                         talloc_free(tmp_ctx);
653                         return -1;
654                 }
655         }
656
657         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
658         if (h->m_write == NULL) {
659                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
660                 talloc_free(tmp_ctx);
661                 return -1;
662         }
663
664         status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
665         if (NT_STATUS_IS_OK(status)) {
666                 ret = 0;
667         } else {
668                 ret = -1;
669         }
670
671         talloc_free(tmp_ctx);
672
673         return ret;
674 }
675
676
677 /* 
678    a record store inside a transaction
679  */
680 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
681 {
682         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
683                 rec->private_data, struct db_ctdb_transaction_handle);
684         int ret;
685
686         ret = db_ctdb_transaction_store(h, rec->key, data);
687         if (ret != 0) {
688                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
689         }
690         return NT_STATUS_OK;
691 }
692
693 /* 
694    a record delete inside a transaction
695  */
696 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
697 {
698         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
699                 rec->private_data, struct db_ctdb_transaction_handle);
700         int ret;
701
702         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
703         if (ret != 0) {
704                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
705         }
706         return NT_STATUS_OK;
707 }
708
709
710 /*
711   replay a transaction
712  */
713 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
714 {
715         int ret, i;
716         struct ctdb_rec_data *rec = NULL;
717
718         h->in_replay = true;
719         talloc_free(h->m_write);
720         h->m_write = NULL;
721
722         ret = db_ctdb_transaction_fetch_start(h);
723         if (ret != 0) {
724                 return ret;
725         }
726
727         for (i=0;i<h->m_all->count;i++) {
728                 TDB_DATA key, data;
729
730                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
731                 if (rec == NULL) {
732                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
733                         goto failed;
734                 }
735
736                 if (rec->reqid == 0) {
737                         /* its a store */
738                         if (db_ctdb_transaction_store(h, key, data) != 0) {
739                                 goto failed;
740                         }
741                 } else {
742                         TDB_DATA data2;
743                         TALLOC_CTX *tmp_ctx = talloc_new(h);
744
745                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
746                                 talloc_free(tmp_ctx);
747                                 goto failed;
748                         }
749                         if (data2.dsize != data.dsize ||
750                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
751                                 /* the record has changed on us - we have to give up */
752                                 talloc_free(tmp_ctx);
753                                 goto failed;
754                         }
755                         talloc_free(tmp_ctx);
756                 }
757         }
758
759         return 0;
760
761 failed:
762         tdb_transaction_cancel(h->ctx->wtdb->tdb);
763         return -1;
764 }
765
766
767 /*
768   commit a transaction
769  */
770 static int db_ctdb_transaction_commit(struct db_context *db)
771 {
772         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
773                                                         struct db_ctdb_ctx);
774         NTSTATUS rets;
775         int ret;
776         int status;
777         int retries = 0;
778         struct db_ctdb_transaction_handle *h = ctx->transaction;
779         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
780
781         if (h == NULL) {
782                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
783                 return -1;
784         }
785
786         if (h->nested_cancel) {
787                 db->transaction_cancel(db);
788                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
789                 return -1;
790         }
791
792         if (h->nesting != 0) {
793                 h->nesting--;
794                 return 0;
795         }
796
797         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
798
799         talloc_set_destructor(h, NULL);
800
801         /* our commit strategy is quite complex.
802
803            - we first try to commit the changes to all other nodes
804
805            - if that works, then we commit locally and we are done
806
807            - if a commit on another node fails, then we need to cancel
808              the transaction, then restart the transaction (thus
809              opening a window of time for a pending recovery to
810              complete), then replay the transaction, checking all the
811              reads and writes (checking that reads give the same data,
812              and writes succeed). Then we retry the transaction to the
813              other nodes
814         */
815
816 again:
817         if (h->m_write == NULL) {
818                 /* no changes were made, potentially after a retry */
819                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
820                 talloc_free(h);
821                 ctx->transaction = NULL;
822                 return 0;
823         }
824
825         /* tell ctdbd to commit to the other nodes */
826         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
827                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
828                                    h->ctx->db_id, 0,
829                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
830         if (!NT_STATUS_IS_OK(rets) || status != 0) {
831                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
832                 sleep(1);
833
834                 if (!NT_STATUS_IS_OK(rets)) {
835                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
836                 } else {
837                         /* work out what error code we will give if we 
838                            have to fail the operation */
839                         switch ((enum ctdb_trans2_commit_error)status) {
840                         case CTDB_TRANS2_COMMIT_SUCCESS:
841                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
842                         case CTDB_TRANS2_COMMIT_TIMEOUT:
843                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
844                                 break;
845                         case CTDB_TRANS2_COMMIT_ALLFAIL:
846                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
847                                 break;
848                         }
849                 }
850
851                 if (++retries == 5) {
852                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
853                                  h->ctx->db_id, retries, (unsigned)failure_control));
854                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
855                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
856                                             tdb_null, NULL, NULL, NULL);
857                         h->ctx->transaction = NULL;
858                         talloc_free(h);
859                         ctx->transaction = NULL;
860                         return -1;                      
861                 }
862
863                 if (ctdb_replay_transaction(h) != 0) {
864                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
865                                  (unsigned)failure_control));
866                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
867                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
868                                             tdb_null, NULL, NULL, NULL);
869                         h->ctx->transaction = NULL;
870                         talloc_free(h);
871                         ctx->transaction = NULL;
872                         return -1;
873                 }
874                 goto again;
875         } else {
876                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
877         }
878
879         /* do the real commit locally */
880         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
881         if (ret != 0) {
882                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
883                          (unsigned)failure_control));
884                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
885                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
886                 h->ctx->transaction = NULL;
887                 talloc_free(h);
888                 return ret;
889         }
890
891         /* tell ctdbd that we are finished with our local commit */
892         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
893                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
894                             tdb_null, NULL, NULL, NULL);
895         h->ctx->transaction = NULL;
896         talloc_free(h);
897         return 0;
898 }
899
900
901 /*
902   cancel a transaction
903  */
904 static int db_ctdb_transaction_cancel(struct db_context *db)
905 {
906         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
907                                                         struct db_ctdb_ctx);
908         struct db_ctdb_transaction_handle *h = ctx->transaction;
909
910         if (h == NULL) {
911                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
912                 return -1;
913         }
914
915         if (h->nesting != 0) {
916                 h->nesting--;
917                 h->nested_cancel = true;
918                 return 0;
919         }
920
921         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
922
923         ctx->transaction = NULL;
924         talloc_free(h);
925         return 0;
926 }
927
928
929 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
930 {
931         struct db_ctdb_rec *crec = talloc_get_type_abort(
932                 rec->private_data, struct db_ctdb_rec);
933
934         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
935 }
936
937
938
939 static NTSTATUS db_ctdb_delete(struct db_record *rec)
940 {
941         TDB_DATA data;
942
943         /*
944          * We have to store the header with empty data. TODO: Fix the
945          * tdb-level cleanup
946          */
947
948         ZERO_STRUCT(data);
949
950         return db_ctdb_store(rec, data, 0);
951
952 }
953
954 static int db_ctdb_record_destr(struct db_record* data)
955 {
956         struct db_ctdb_rec *crec = talloc_get_type_abort(
957                 data->private_data, struct db_ctdb_rec);
958
959         DEBUG(10, (DEBUGLEVEL > 10
960                    ? "Unlocking db %u key %s\n"
961                    : "Unlocking db %u key %.20s\n",
962                    (int)crec->ctdb_ctx->db_id,
963                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
964                               data->key.dsize)));
965
966         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
967                 DEBUG(0, ("tdb_chainunlock failed\n"));
968                 return -1;
969         }
970
971         return 0;
972 }
973
974 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
975                                                TALLOC_CTX *mem_ctx,
976                                                TDB_DATA key,
977                                                bool persistent)
978 {
979         struct db_record *result;
980         struct db_ctdb_rec *crec;
981         NTSTATUS status;
982         TDB_DATA ctdb_data;
983         int migrate_attempts = 0;
984
985         if (!(result = talloc(mem_ctx, struct db_record))) {
986                 DEBUG(0, ("talloc failed\n"));
987                 return NULL;
988         }
989
990         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
991                 DEBUG(0, ("talloc failed\n"));
992                 TALLOC_FREE(result);
993                 return NULL;
994         }
995
996         result->private_data = (void *)crec;
997         crec->ctdb_ctx = ctx;
998
999         result->key.dsize = key.dsize;
1000         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
1001         if (result->key.dptr == NULL) {
1002                 DEBUG(0, ("talloc failed\n"));
1003                 TALLOC_FREE(result);
1004                 return NULL;
1005         }
1006
1007         /*
1008          * Do a blocking lock on the record
1009          */
1010 again:
1011
1012         if (DEBUGLEVEL >= 10) {
1013                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1014                 DEBUG(10, (DEBUGLEVEL > 10
1015                            ? "Locking db %u key %s\n"
1016                            : "Locking db %u key %.20s\n",
1017                            (int)crec->ctdb_ctx->db_id, keystr));
1018                 TALLOC_FREE(keystr);
1019         }
1020
1021         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1022                 DEBUG(3, ("tdb_chainlock failed\n"));
1023                 TALLOC_FREE(result);
1024                 return NULL;
1025         }
1026
1027         result->store = db_ctdb_store;
1028         result->delete_rec = db_ctdb_delete;
1029         talloc_set_destructor(result, db_ctdb_record_destr);
1030
1031         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1032
1033         /*
1034          * See if we have a valid record and we are the dmaster. If so, we can
1035          * take the shortcut and just return it.
1036          */
1037
1038         if ((ctdb_data.dptr == NULL) ||
1039             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1040             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1041 #if 0
1042             || (random() % 2 != 0)
1043 #endif
1044 ) {
1045                 SAFE_FREE(ctdb_data.dptr);
1046                 tdb_chainunlock(ctx->wtdb->tdb, key);
1047                 talloc_set_destructor(result, NULL);
1048
1049                 migrate_attempts += 1;
1050
1051                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1052                            ctdb_data.dptr, ctdb_data.dptr ?
1053                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1054                            get_my_vnn()));
1055
1056                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1057                 if (!NT_STATUS_IS_OK(status)) {
1058                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1059                                   nt_errstr(status)));
1060                         TALLOC_FREE(result);
1061                         return NULL;
1062                 }
1063                 /* now its migrated, try again */
1064                 goto again;
1065         }
1066
1067         if (migrate_attempts > 10) {
1068                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1069                           migrate_attempts));
1070         }
1071
1072         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1073
1074         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1075         result->value.dptr = NULL;
1076
1077         if ((result->value.dsize != 0)
1078             && !(result->value.dptr = (uint8 *)talloc_memdup(
1079                          result, ctdb_data.dptr + sizeof(crec->header),
1080                          result->value.dsize))) {
1081                 DEBUG(0, ("talloc failed\n"));
1082                 TALLOC_FREE(result);
1083         }
1084
1085         SAFE_FREE(ctdb_data.dptr);
1086
1087         return result;
1088 }
1089
1090 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1091                                               TALLOC_CTX *mem_ctx,
1092                                               TDB_DATA key)
1093 {
1094         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1095                                                         struct db_ctdb_ctx);
1096
1097         if (ctx->transaction != NULL) {
1098                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1099         }
1100
1101         if (db->persistent) {
1102                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1103         }
1104
1105         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1106 }
1107
1108 /*
1109   fetch (unlocked, no migration) operation on ctdb
1110  */
1111 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1112                          TDB_DATA key, TDB_DATA *data)
1113 {
1114         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1115                                                         struct db_ctdb_ctx);
1116         NTSTATUS status;
1117         TDB_DATA ctdb_data;
1118
1119         if (ctx->transaction) {
1120                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1121         }
1122
1123         /* try a direct fetch */
1124         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1125
1126         /*
1127          * See if we have a valid record and we are the dmaster. If so, we can
1128          * take the shortcut and just return it.
1129          * we bypass the dmaster check for persistent databases
1130          */
1131         if ((ctdb_data.dptr != NULL) &&
1132             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1133             (db->persistent ||
1134              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1135                 /* we are the dmaster - avoid the ctdb protocol op */
1136
1137                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1138                 if (data->dsize == 0) {
1139                         SAFE_FREE(ctdb_data.dptr);
1140                         data->dptr = NULL;
1141                         return 0;
1142                 }
1143
1144                 data->dptr = (uint8 *)talloc_memdup(
1145                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1146                         data->dsize);
1147
1148                 SAFE_FREE(ctdb_data.dptr);
1149
1150                 if (data->dptr == NULL) {
1151                         return -1;
1152                 }
1153                 return 0;
1154         }
1155
1156         SAFE_FREE(ctdb_data.dptr);
1157
1158         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1159         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1160         if (!NT_STATUS_IS_OK(status)) {
1161                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1162                 return -1;
1163         }
1164
1165         return 0;
1166 }
1167
1168 struct traverse_state {
1169         struct db_context *db;
1170         int (*fn)(struct db_record *rec, void *private_data);
1171         void *private_data;
1172 };
1173
1174 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1175 {
1176         struct traverse_state *state = (struct traverse_state *)private_data;
1177         struct db_record *rec;
1178         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1179         /* we have to give them a locked record to prevent races */
1180         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1181         if (rec && rec->value.dsize > 0) {
1182                 state->fn(rec, state->private_data);
1183         }
1184         talloc_free(tmp_ctx);
1185 }
1186
1187 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1188                                         void *private_data)
1189 {
1190         struct traverse_state *state = (struct traverse_state *)private_data;
1191         struct db_record *rec;
1192         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1193         int ret = 0;
1194         /* we have to give them a locked record to prevent races */
1195         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1196         if (rec && rec->value.dsize > 0) {
1197                 ret = state->fn(rec, state->private_data);
1198         }
1199         talloc_free(tmp_ctx);
1200         return ret;
1201 }
1202
1203 static int db_ctdb_traverse(struct db_context *db,
1204                             int (*fn)(struct db_record *rec,
1205                                       void *private_data),
1206                             void *private_data)
1207 {
1208         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1209                                                         struct db_ctdb_ctx);
1210         struct traverse_state state;
1211
1212         state.db = db;
1213         state.fn = fn;
1214         state.private_data = private_data;
1215
1216         if (db->persistent) {
1217                 /* for persistent databases we don't need to do a ctdb traverse,
1218                    we can do a faster local traverse */
1219                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1220         }
1221
1222
1223         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1224         return 0;
1225 }
1226
1227 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1228 {
1229         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1230 }
1231
1232 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1233 {
1234         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1235 }
1236
1237 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1238 {
1239         struct traverse_state *state = (struct traverse_state *)private_data;
1240         struct db_record rec;
1241         rec.key = key;
1242         rec.value = data;
1243         rec.store = db_ctdb_store_deny;
1244         rec.delete_rec = db_ctdb_delete_deny;
1245         rec.private_data = state->db;
1246         state->fn(&rec, state->private_data);
1247 }
1248
1249 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1250                                         void *private_data)
1251 {
1252         struct traverse_state *state = (struct traverse_state *)private_data;
1253         struct db_record rec;
1254         rec.key = kbuf;
1255         rec.value = dbuf;
1256         rec.store = db_ctdb_store_deny;
1257         rec.delete_rec = db_ctdb_delete_deny;
1258         rec.private_data = state->db;
1259
1260         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1261                 /* a deleted record */
1262                 return 0;
1263         }
1264         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1265         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1266
1267         return state->fn(&rec, state->private_data);
1268 }
1269
1270 static int db_ctdb_traverse_read(struct db_context *db,
1271                                  int (*fn)(struct db_record *rec,
1272                                            void *private_data),
1273                                  void *private_data)
1274 {
1275         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1276                                                         struct db_ctdb_ctx);
1277         struct traverse_state state;
1278
1279         state.db = db;
1280         state.fn = fn;
1281         state.private_data = private_data;
1282
1283         if (db->persistent) {
1284                 /* for persistent databases we don't need to do a ctdb traverse,
1285                    we can do a faster local traverse */
1286                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1287         }
1288
1289         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1290         return 0;
1291 }
1292
1293 static int db_ctdb_get_seqnum(struct db_context *db)
1294 {
1295         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1296                                                         struct db_ctdb_ctx);
1297         return tdb_get_seqnum(ctx->wtdb->tdb);
1298 }
1299
1300 static int db_ctdb_get_flags(struct db_context *db)
1301 {
1302         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1303                                                         struct db_ctdb_ctx);
1304         return tdb_get_flags(ctx->wtdb->tdb);
1305 }
1306
1307 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1308                                 const char *name,
1309                                 int hash_size, int tdb_flags,
1310                                 int open_flags, mode_t mode)
1311 {
1312         struct db_context *result;
1313         struct db_ctdb_ctx *db_ctdb;
1314         char *db_path;
1315
1316         if (!lp_clustering()) {
1317                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1318                 return NULL;
1319         }
1320
1321         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1322                 DEBUG(0, ("talloc failed\n"));
1323                 TALLOC_FREE(result);
1324                 return NULL;
1325         }
1326
1327         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1328                 DEBUG(0, ("talloc failed\n"));
1329                 TALLOC_FREE(result);
1330                 return NULL;
1331         }
1332
1333         db_ctdb->transaction = NULL;
1334         db_ctdb->db = result;
1335
1336         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1337                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1338                 TALLOC_FREE(result);
1339                 return NULL;
1340         }
1341
1342         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1343
1344         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1345
1346         /* only pass through specific flags */
1347         tdb_flags &= TDB_SEQNUM;
1348
1349         /* honor permissions if user has specified O_CREAT */
1350         if (open_flags & O_CREAT) {
1351                 chmod(db_path, mode);
1352         }
1353
1354         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1355         if (db_ctdb->wtdb == NULL) {
1356                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1357                 TALLOC_FREE(result);
1358                 return NULL;
1359         }
1360         talloc_free(db_path);
1361
1362         result->private_data = (void *)db_ctdb;
1363         result->fetch_locked = db_ctdb_fetch_locked;
1364         result->fetch = db_ctdb_fetch;
1365         result->traverse = db_ctdb_traverse;
1366         result->traverse_read = db_ctdb_traverse_read;
1367         result->get_seqnum = db_ctdb_get_seqnum;
1368         result->get_flags = db_ctdb_get_flags;
1369         result->transaction_start = db_ctdb_transaction_start;
1370         result->transaction_commit = db_ctdb_transaction_commit;
1371         result->transaction_cancel = db_ctdb_transaction_cancel;
1372
1373         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1374                  name, db_ctdb->db_id));
1375
1376         return result;
1377 }
1378 #endif