9e57aadb258372b8f3f97848593b3bc76c8cb2bf
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /*
30          * we store the reads and writes done under a transaction:
31          * - one list stores both reads and writes (m_all),
32          * - the other just writes (m_write)
33          */
34         struct ctdb_marshall_buffer *m_all;
35         struct ctdb_marshall_buffer *m_write;
36         uint32_t nesting;
37         bool nested_cancel;
38 };
39
40 struct db_ctdb_ctx {
41         struct db_context *db;
42         struct tdb_wrap *wtdb;
43         uint32 db_id;
44         struct db_ctdb_transaction_handle *transaction;
45 };
46
47 struct db_ctdb_rec {
48         struct db_ctdb_ctx *ctdb_ctx;
49         struct ctdb_ltdb_header header;
50 };
51
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53                                                TALLOC_CTX *mem_ctx,
54                                                TDB_DATA key,
55                                                bool persistent);
56
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 {
59         NTSTATUS status;
60         enum TDB_ERROR tret = tdb_error(tdb);
61
62         switch (tret) {
63         case TDB_ERR_EXISTS:
64                 status = NT_STATUS_OBJECT_NAME_COLLISION;
65                 break;
66         case TDB_ERR_NOEXIST:
67                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68                 break;
69         default:
70                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71                 break;
72         }
73
74         return status;
75 }
76
77
78 /**
79  * fetch a record from the tdb, separating out the header
80  * information and returning the body of the record.
81  */
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83                                    TDB_DATA key,
84                                    struct ctdb_ltdb_header *header,
85                                    TALLOC_CTX *mem_ctx,
86                                    TDB_DATA *data)
87 {
88         TDB_DATA rec;
89         NTSTATUS status;
90
91         rec = tdb_fetch(db->wtdb->tdb, key);
92         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93                 status = NT_STATUS_NOT_FOUND;
94                 if (data) {
95                         ZERO_STRUCTP(data);
96                 }
97                 if (header) {
98                         header->dmaster = (uint32_t)-1;
99                         header->rsn = 0;
100                 }
101                 goto done;
102         }
103
104         if (header) {
105                 *header = *(struct ctdb_ltdb_header *)rec.dptr;
106         }
107
108         if (data) {
109                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110                 if (data->dsize == 0) {
111                         data->dptr = NULL;
112                 } else {
113                         data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114                                         rec.dptr
115                                          + sizeof(struct ctdb_ltdb_header),
116                                         data->dsize);
117                         if (data->dptr == NULL) {
118                                 status = NT_STATUS_NO_MEMORY;
119                                 goto done;
120                         }
121                 }
122         }
123
124         status = NT_STATUS_OK;
125
126 done:
127         SAFE_FREE(rec.dptr);
128         return status;
129 }
130
131 /*
132  * Store a record together with the ctdb record header
133  * in the local copy of the database.
134  */
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136                                    TDB_DATA key,
137                                    struct ctdb_ltdb_header *header,
138                                    TDB_DATA data)
139 {
140         TALLOC_CTX *tmp_ctx = talloc_stackframe();
141         TDB_DATA rec;
142         int ret;
143
144         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
146
147         if (rec.dptr == NULL) {
148                 talloc_free(tmp_ctx);
149                 return NT_STATUS_NO_MEMORY;
150         }
151
152         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
154
155         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
156
157         talloc_free(tmp_ctx);
158
159         return (ret == 0) ? NT_STATUS_OK
160                           : tdb_error_to_ntstatus(db->wtdb->tdb);
161
162 }
163
164 /*
165   form a ctdb_rec_data record from a key/data pair
166
167   note that header may be NULL. If not NULL then it is included in the data portion
168   of the record
169  */
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
171                                                   TDB_DATA key, 
172                                                   struct ctdb_ltdb_header *header,
173                                                   TDB_DATA data)
174 {
175         size_t length;
176         struct ctdb_rec_data *d;
177
178         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
179                 data.dsize + (header?sizeof(*header):0);
180         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181         if (d == NULL) {
182                 return NULL;
183         }
184         d->length = length;
185         d->reqid = reqid;
186         d->keylen = key.dsize;
187         memcpy(&d->data[0], key.dptr, key.dsize);
188         if (header) {
189                 d->datalen = data.dsize + sizeof(*header);
190                 memcpy(&d->data[key.dsize], header, sizeof(*header));
191                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192         } else {
193                 d->datalen = data.dsize;
194                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
195         }
196         return d;
197 }
198
199
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
202                                                struct ctdb_marshall_buffer *m,
203                                                uint64_t db_id,
204                                                uint32_t reqid,
205                                                TDB_DATA key,
206                                                struct ctdb_ltdb_header *header,
207                                                TDB_DATA data)
208 {
209         struct ctdb_rec_data *r;
210         size_t m_size, r_size;
211         struct ctdb_marshall_buffer *m2 = NULL;
212
213         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214         if (r == NULL) {
215                 talloc_free(m);
216                 return NULL;
217         }
218
219         if (m == NULL) {
220                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222                 if (m == NULL) {
223                         goto done;
224                 }
225                 m->db_id = db_id;
226         }
227
228         m_size = talloc_get_size(m);
229         r_size = talloc_get_size(r);
230
231         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232                 mem_ctx, m,  m_size + r_size);
233         if (m2 == NULL) {
234                 talloc_free(m);
235                 goto done;
236         }
237
238         memcpy(m_size + (uint8_t *)m2, r, r_size);
239
240         m2->count++;
241
242 done:
243         talloc_free(r);
244         return m2;
245 }
246
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
249 {
250         TDB_DATA data;
251         data.dptr = (uint8_t *)m;
252         data.dsize = talloc_get_size(m);
253         return data;
254 }
255
256 /* 
257    loop over a marshalling buffer 
258
259      - pass r==NULL to start
260      - loop the number of times indicated by m->count
261 */
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263                                                      uint32_t *reqid,
264                                                      struct ctdb_ltdb_header *header,
265                                                      TDB_DATA *key, TDB_DATA *data)
266 {
267         if (r == NULL) {
268                 r = (struct ctdb_rec_data *)&m->data[0];
269         } else {
270                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
271         }
272
273         if (reqid != NULL) {
274                 *reqid = r->reqid;
275         }
276
277         if (key != NULL) {
278                 key->dptr   = &r->data[0];
279                 key->dsize  = r->keylen;
280         }
281         if (data != NULL) {
282                 data->dptr  = &r->data[r->keylen];
283                 data->dsize = r->datalen;
284                 if (header != NULL) {
285                         data->dptr += sizeof(*header);
286                         data->dsize -= sizeof(*header);
287                 }
288         }
289
290         if (header != NULL) {
291                 if (r->datalen < sizeof(*header)) {
292                         return NULL;
293                 }
294                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
295         }
296
297         return r;
298 }
299
300
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
302 {
303         int32_t status;
304         NTSTATUS ret;
305         TDB_DATA indata;
306
307         indata.dptr = (uint8_t *)&db_id;
308         indata.dsize = sizeof(db_id);
309
310         ret = ctdbd_control_local(messaging_ctdbd_connection(),
311                                   CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312                                   indata, NULL, NULL, &status);
313
314         if (!NT_STATUS_IS_OK(ret)) {
315                 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316                 return -1;
317         }
318
319         return status;
320 }
321
322
323 /**
324  * CTDB transaction destructor
325  */
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
327 {
328         tdb_transaction_cancel(h->ctx->wtdb->tdb);
329         return 0;
330 }
331
332 /**
333  * start a transaction on a ctdb database:
334  * - lock the transaction lock key
335  * - start the tdb transaction
336  */
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
338 {
339         struct db_record *rh;
340         struct db_ctdb_rec *crec;
341         TDB_DATA key;
342         TALLOC_CTX *tmp_ctx;
343         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344         int ret;
345         struct db_ctdb_ctx *ctx = h->ctx;
346         TDB_DATA data;
347         pid_t pid;
348         NTSTATUS status;
349         struct ctdb_ltdb_header header;
350         int32_t transaction_status;
351
352         key.dptr = (uint8_t *)discard_const(keyname);
353         key.dsize = strlen(keyname);
354
355 again:
356         tmp_ctx = talloc_new(h);
357
358         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
359         if (rh == NULL) {
360                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
361                 talloc_free(tmp_ctx);
362                 return -1;
363         }
364         crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
365
366         transaction_status = db_ctdb_transaction_active(ctx->db_id);
367         if (transaction_status == 1) {
368                 unsigned long int usec = (1000 + random()) % 100000;
369                 DEBUG(3, ("Transaction already active on db_id[0x%08x]."
370                           "Re-trying after %lu microseconds...",
371                           ctx->db_id, usec));
372                 talloc_free(tmp_ctx);
373                 usleep(usec);
374                 goto again;
375         }
376
377         /*
378          * store the pid in the database:
379          * it is not enought that the node is dmaster...
380          */
381         pid = getpid();
382         data.dptr = (unsigned char *)&pid;
383         data.dsize = sizeof(pid_t);
384         crec->header.rsn++;
385         crec->header.dmaster = get_my_vnn();
386         status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
387         if (!NT_STATUS_IS_OK(status)) {
388                 DEBUG(0, (__location__ " Failed to store pid in transaction "
389                           "record: %s\n", nt_errstr(status)));
390                 talloc_free(tmp_ctx);
391                 return -1;
392         }
393
394         talloc_free(rh);
395
396         ret = tdb_transaction_start(ctx->wtdb->tdb);
397         if (ret != 0) {
398                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
399                 talloc_free(tmp_ctx);
400                 return -1;
401         }
402
403         status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
404         if (!NT_STATUS_IS_OK(status)) {
405                 DEBUG(0, (__location__ " failed to refetch transaction lock "
406                           "record inside transaction: %s - retrying\n",
407                           nt_errstr(status)));
408                 tdb_transaction_cancel(ctx->wtdb->tdb);
409                 talloc_free(tmp_ctx);
410                 goto again;
411         }
412
413         if (header.dmaster != get_my_vnn()) {
414                 DEBUG(3, (__location__ " refetch transaction lock record : "
415                           "we are not dmaster any more "
416                           "(dmaster[%u] != my_vnn[%u]) - retrying\n",
417                           header.dmaster, get_my_vnn()));
418                 tdb_transaction_cancel(ctx->wtdb->tdb);
419                 talloc_free(tmp_ctx);
420                 goto again;
421         }
422
423         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
424                 DEBUG(3, (__location__ " refetch transaction lock record: "
425                           "another local process has started a transaction "
426                           "(stored pid [%u] != my pid [%u]) - retrying\n",
427                           *(pid_t *)(data.dptr), pid));
428                 tdb_transaction_cancel(ctx->wtdb->tdb);
429                 talloc_free(tmp_ctx);
430                 goto again;
431         }
432
433         talloc_free(tmp_ctx);
434
435         return 0;
436 }
437
438
439 /**
440  * CTDB dbwrap API: transaction_start function
441  * starts a transaction on a persistent database
442  */
443 static int db_ctdb_transaction_start(struct db_context *db)
444 {
445         struct db_ctdb_transaction_handle *h;
446         int ret;
447         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
448                                                         struct db_ctdb_ctx);
449
450         if (!db->persistent) {
451                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
452                          ctx->db_id));
453                 return -1;
454         }
455
456         if (ctx->transaction) {
457                 ctx->transaction->nesting++;
458                 return 0;
459         }
460
461         h = talloc_zero(db, struct db_ctdb_transaction_handle);
462         if (h == NULL) {
463                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
464                 return -1;
465         }
466
467         h->ctx = ctx;
468
469         ret = db_ctdb_transaction_fetch_start(h);
470         if (ret != 0) {
471                 talloc_free(h);
472                 return -1;
473         }
474
475         talloc_set_destructor(h, db_ctdb_transaction_destructor);
476
477         ctx->transaction = h;
478
479         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
480
481         return 0;
482 }
483
484
485
486 /*
487   fetch a record inside a transaction
488  */
489 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
490                                      TALLOC_CTX *mem_ctx, 
491                                      TDB_DATA key, TDB_DATA *data)
492 {
493         struct db_ctdb_transaction_handle *h = db->transaction;
494         NTSTATUS status;
495
496         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
497
498         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
499                 *data = tdb_null;
500         } else if (!NT_STATUS_IS_OK(status)) {
501                 return -1;
502         }
503
504         if (!h->in_replay) {
505                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
506                 if (h->m_all == NULL) {
507                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
508                         data->dsize = 0;
509                         talloc_free(data->dptr);
510                         return -1;
511                 }
512         }
513
514         return 0;
515 }
516
517
518 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
519 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
520
521 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
522                                                           TALLOC_CTX *mem_ctx,
523                                                           TDB_DATA key)
524 {
525         struct db_record *result;
526         TDB_DATA ctdb_data;
527
528         if (!(result = talloc(mem_ctx, struct db_record))) {
529                 DEBUG(0, ("talloc failed\n"));
530                 return NULL;
531         }
532
533         result->private_data = ctx->transaction;
534
535         result->key.dsize = key.dsize;
536         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
537         if (result->key.dptr == NULL) {
538                 DEBUG(0, ("talloc failed\n"));
539                 TALLOC_FREE(result);
540                 return NULL;
541         }
542
543         result->store = db_ctdb_store_transaction;
544         result->delete_rec = db_ctdb_delete_transaction;
545
546         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
547         if (ctdb_data.dptr == NULL) {
548                 /* create the record */
549                 result->value = tdb_null;
550                 return result;
551         }
552
553         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
554         result->value.dptr = NULL;
555
556         if ((result->value.dsize != 0)
557             && !(result->value.dptr = (uint8 *)talloc_memdup(
558                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
559                          result->value.dsize))) {
560                 DEBUG(0, ("talloc failed\n"));
561                 TALLOC_FREE(result);
562         }
563
564         SAFE_FREE(ctdb_data.dptr);
565
566         return result;
567 }
568
569 static int db_ctdb_record_destructor(struct db_record **recp)
570 {
571         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
572         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
573                 rec->private_data, struct db_ctdb_transaction_handle);
574         int ret = h->ctx->db->transaction_commit(h->ctx->db);
575         if (ret != 0) {
576                 DEBUG(0,(__location__ " transaction_commit failed\n"));
577         }
578         return 0;
579 }
580
581 /*
582   auto-create a transaction for persistent databases
583  */
584 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
585                                                          TALLOC_CTX *mem_ctx,
586                                                          TDB_DATA key)
587 {
588         int res;
589         struct db_record *rec, **recp;
590
591         res = db_ctdb_transaction_start(ctx->db);
592         if (res == -1) {
593                 return NULL;
594         }
595
596         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
597         if (rec == NULL) {
598                 ctx->db->transaction_cancel(ctx->db);           
599                 return NULL;
600         }
601
602         /* destroy this transaction when we release the lock */
603         recp = talloc(rec, struct db_record *);
604         if (recp == NULL) {
605                 ctx->db->transaction_cancel(ctx->db);
606                 talloc_free(rec);
607                 return NULL;
608         }
609         *recp = rec;
610         talloc_set_destructor(recp, db_ctdb_record_destructor);
611         return rec;
612 }
613
614
615 /*
616   stores a record inside a transaction
617  */
618 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
619                                      TDB_DATA key, TDB_DATA data)
620 {
621         TALLOC_CTX *tmp_ctx = talloc_new(h);
622         int ret;
623         TDB_DATA rec;
624         struct ctdb_ltdb_header header;
625         NTSTATUS status;
626
627         /* we need the header so we can update the RSN */
628         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
629         if (rec.dptr == NULL) {
630                 /* the record doesn't exist - create one with us as dmaster.
631                    This is only safe because we are in a transaction and this
632                    is a persistent database */
633                 ZERO_STRUCT(header);
634         } else {
635                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
636                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
637                 /* a special case, we are writing the same data that is there now */
638                 if (data.dsize == rec.dsize &&
639                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
640                         SAFE_FREE(rec.dptr);
641                         talloc_free(tmp_ctx);
642                         return 0;
643                 }
644                 SAFE_FREE(rec.dptr);
645         }
646
647         header.dmaster = get_my_vnn();
648         header.rsn++;
649
650         if (!h->in_replay) {
651                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
652                 if (h->m_all == NULL) {
653                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
654                         talloc_free(tmp_ctx);
655                         return -1;
656                 }
657         }
658
659         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
660         if (h->m_write == NULL) {
661                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
662                 talloc_free(tmp_ctx);
663                 return -1;
664         }
665
666         status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
667         if (NT_STATUS_IS_OK(status)) {
668                 ret = 0;
669         } else {
670                 ret = -1;
671         }
672
673         talloc_free(tmp_ctx);
674
675         return ret;
676 }
677
678
679 /* 
680    a record store inside a transaction
681  */
682 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
683 {
684         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
685                 rec->private_data, struct db_ctdb_transaction_handle);
686         int ret;
687
688         ret = db_ctdb_transaction_store(h, rec->key, data);
689         if (ret != 0) {
690                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
691         }
692         return NT_STATUS_OK;
693 }
694
695 /* 
696    a record delete inside a transaction
697  */
698 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
699 {
700         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
701                 rec->private_data, struct db_ctdb_transaction_handle);
702         int ret;
703
704         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
705         if (ret != 0) {
706                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
707         }
708         return NT_STATUS_OK;
709 }
710
711
712 /*
713   replay a transaction
714  */
715 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
716 {
717         int ret, i;
718         struct ctdb_rec_data *rec = NULL;
719
720         h->in_replay = true;
721         talloc_free(h->m_write);
722         h->m_write = NULL;
723
724         ret = db_ctdb_transaction_fetch_start(h);
725         if (ret != 0) {
726                 return ret;
727         }
728
729         for (i=0;i<h->m_all->count;i++) {
730                 TDB_DATA key, data;
731
732                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
733                 if (rec == NULL) {
734                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
735                         goto failed;
736                 }
737
738                 if (rec->reqid == 0) {
739                         /* its a store */
740                         if (db_ctdb_transaction_store(h, key, data) != 0) {
741                                 goto failed;
742                         }
743                 } else {
744                         TDB_DATA data2;
745                         TALLOC_CTX *tmp_ctx = talloc_new(h);
746
747                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
748                                 talloc_free(tmp_ctx);
749                                 goto failed;
750                         }
751                         if (data2.dsize != data.dsize ||
752                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
753                                 /* the record has changed on us - we have to give up */
754                                 talloc_free(tmp_ctx);
755                                 goto failed;
756                         }
757                         talloc_free(tmp_ctx);
758                 }
759         }
760
761         return 0;
762
763 failed:
764         tdb_transaction_cancel(h->ctx->wtdb->tdb);
765         return -1;
766 }
767
768
769 /*
770   commit a transaction
771  */
772 static int db_ctdb_transaction_commit(struct db_context *db)
773 {
774         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
775                                                         struct db_ctdb_ctx);
776         NTSTATUS rets;
777         int ret;
778         int status;
779         int retries = 0;
780         struct db_ctdb_transaction_handle *h = ctx->transaction;
781         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
782
783         if (h == NULL) {
784                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
785                 return -1;
786         }
787
788         if (h->nested_cancel) {
789                 db->transaction_cancel(db);
790                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
791                 return -1;
792         }
793
794         if (h->nesting != 0) {
795                 h->nesting--;
796                 return 0;
797         }
798
799         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
800
801         talloc_set_destructor(h, NULL);
802
803         /* our commit strategy is quite complex.
804
805            - we first try to commit the changes to all other nodes
806
807            - if that works, then we commit locally and we are done
808
809            - if a commit on another node fails, then we need to cancel
810              the transaction, then restart the transaction (thus
811              opening a window of time for a pending recovery to
812              complete), then replay the transaction, checking all the
813              reads and writes (checking that reads give the same data,
814              and writes succeed). Then we retry the transaction to the
815              other nodes
816         */
817
818 again:
819         if (h->m_write == NULL) {
820                 /* no changes were made, potentially after a retry */
821                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
822                 talloc_free(h);
823                 ctx->transaction = NULL;
824                 return 0;
825         }
826
827         /* tell ctdbd to commit to the other nodes */
828         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
829                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
830                                    h->ctx->db_id, 0,
831                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
832         if (!NT_STATUS_IS_OK(rets) || status != 0) {
833                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
834                 sleep(1);
835
836                 if (!NT_STATUS_IS_OK(rets)) {
837                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
838                 } else {
839                         /* work out what error code we will give if we 
840                            have to fail the operation */
841                         switch ((enum ctdb_trans2_commit_error)status) {
842                         case CTDB_TRANS2_COMMIT_SUCCESS:
843                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
844                         case CTDB_TRANS2_COMMIT_TIMEOUT:
845                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
846                                 break;
847                         case CTDB_TRANS2_COMMIT_ALLFAIL:
848                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
849                                 break;
850                         }
851                 }
852
853                 if (++retries == 5) {
854                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
855                                  h->ctx->db_id, retries, (unsigned)failure_control));
856                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
857                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
858                                             tdb_null, NULL, NULL, NULL);
859                         h->ctx->transaction = NULL;
860                         talloc_free(h);
861                         ctx->transaction = NULL;
862                         return -1;                      
863                 }
864
865                 if (ctdb_replay_transaction(h) != 0) {
866                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
867                                  (unsigned)failure_control));
868                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
869                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
870                                             tdb_null, NULL, NULL, NULL);
871                         h->ctx->transaction = NULL;
872                         talloc_free(h);
873                         ctx->transaction = NULL;
874                         return -1;
875                 }
876                 goto again;
877         } else {
878                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
879         }
880
881         /* do the real commit locally */
882         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
883         if (ret != 0) {
884                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
885                          (unsigned)failure_control));
886                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
887                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
888                 h->ctx->transaction = NULL;
889                 talloc_free(h);
890                 return ret;
891         }
892
893         /* tell ctdbd that we are finished with our local commit */
894         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
895                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
896                             tdb_null, NULL, NULL, NULL);
897         h->ctx->transaction = NULL;
898         talloc_free(h);
899         return 0;
900 }
901
902
903 /*
904   cancel a transaction
905  */
906 static int db_ctdb_transaction_cancel(struct db_context *db)
907 {
908         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
909                                                         struct db_ctdb_ctx);
910         struct db_ctdb_transaction_handle *h = ctx->transaction;
911
912         if (h == NULL) {
913                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
914                 return -1;
915         }
916
917         if (h->nesting != 0) {
918                 h->nesting--;
919                 h->nested_cancel = true;
920                 return 0;
921         }
922
923         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
924
925         ctx->transaction = NULL;
926         talloc_free(h);
927         return 0;
928 }
929
930
931 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
932 {
933         struct db_ctdb_rec *crec = talloc_get_type_abort(
934                 rec->private_data, struct db_ctdb_rec);
935
936         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
937 }
938
939
940
941 static NTSTATUS db_ctdb_delete(struct db_record *rec)
942 {
943         TDB_DATA data;
944
945         /*
946          * We have to store the header with empty data. TODO: Fix the
947          * tdb-level cleanup
948          */
949
950         ZERO_STRUCT(data);
951
952         return db_ctdb_store(rec, data, 0);
953
954 }
955
956 static int db_ctdb_record_destr(struct db_record* data)
957 {
958         struct db_ctdb_rec *crec = talloc_get_type_abort(
959                 data->private_data, struct db_ctdb_rec);
960
961         DEBUG(10, (DEBUGLEVEL > 10
962                    ? "Unlocking db %u key %s\n"
963                    : "Unlocking db %u key %.20s\n",
964                    (int)crec->ctdb_ctx->db_id,
965                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
966                               data->key.dsize)));
967
968         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
969                 DEBUG(0, ("tdb_chainunlock failed\n"));
970                 return -1;
971         }
972
973         return 0;
974 }
975
976 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
977                                                TALLOC_CTX *mem_ctx,
978                                                TDB_DATA key,
979                                                bool persistent)
980 {
981         struct db_record *result;
982         struct db_ctdb_rec *crec;
983         NTSTATUS status;
984         TDB_DATA ctdb_data;
985         int migrate_attempts = 0;
986
987         if (!(result = talloc(mem_ctx, struct db_record))) {
988                 DEBUG(0, ("talloc failed\n"));
989                 return NULL;
990         }
991
992         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
993                 DEBUG(0, ("talloc failed\n"));
994                 TALLOC_FREE(result);
995                 return NULL;
996         }
997
998         result->private_data = (void *)crec;
999         crec->ctdb_ctx = ctx;
1000
1001         result->key.dsize = key.dsize;
1002         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
1003         if (result->key.dptr == NULL) {
1004                 DEBUG(0, ("talloc failed\n"));
1005                 TALLOC_FREE(result);
1006                 return NULL;
1007         }
1008
1009         /*
1010          * Do a blocking lock on the record
1011          */
1012 again:
1013
1014         if (DEBUGLEVEL >= 10) {
1015                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1016                 DEBUG(10, (DEBUGLEVEL > 10
1017                            ? "Locking db %u key %s\n"
1018                            : "Locking db %u key %.20s\n",
1019                            (int)crec->ctdb_ctx->db_id, keystr));
1020                 TALLOC_FREE(keystr);
1021         }
1022
1023         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
1024                 DEBUG(3, ("tdb_chainlock failed\n"));
1025                 TALLOC_FREE(result);
1026                 return NULL;
1027         }
1028
1029         result->store = db_ctdb_store;
1030         result->delete_rec = db_ctdb_delete;
1031         talloc_set_destructor(result, db_ctdb_record_destr);
1032
1033         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1034
1035         /*
1036          * See if we have a valid record and we are the dmaster. If so, we can
1037          * take the shortcut and just return it.
1038          */
1039
1040         if ((ctdb_data.dptr == NULL) ||
1041             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1042             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1043 #if 0
1044             || (random() % 2 != 0)
1045 #endif
1046 ) {
1047                 SAFE_FREE(ctdb_data.dptr);
1048                 tdb_chainunlock(ctx->wtdb->tdb, key);
1049                 talloc_set_destructor(result, NULL);
1050
1051                 migrate_attempts += 1;
1052
1053                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1054                            ctdb_data.dptr, ctdb_data.dptr ?
1055                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1056                            get_my_vnn()));
1057
1058                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1059                 if (!NT_STATUS_IS_OK(status)) {
1060                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1061                                   nt_errstr(status)));
1062                         TALLOC_FREE(result);
1063                         return NULL;
1064                 }
1065                 /* now its migrated, try again */
1066                 goto again;
1067         }
1068
1069         if (migrate_attempts > 10) {
1070                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1071                           migrate_attempts));
1072         }
1073
1074         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1075
1076         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1077         result->value.dptr = NULL;
1078
1079         if ((result->value.dsize != 0)
1080             && !(result->value.dptr = (uint8 *)talloc_memdup(
1081                          result, ctdb_data.dptr + sizeof(crec->header),
1082                          result->value.dsize))) {
1083                 DEBUG(0, ("talloc failed\n"));
1084                 TALLOC_FREE(result);
1085         }
1086
1087         SAFE_FREE(ctdb_data.dptr);
1088
1089         return result;
1090 }
1091
1092 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1093                                               TALLOC_CTX *mem_ctx,
1094                                               TDB_DATA key)
1095 {
1096         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1097                                                         struct db_ctdb_ctx);
1098
1099         if (ctx->transaction != NULL) {
1100                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1101         }
1102
1103         if (db->persistent) {
1104                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1105         }
1106
1107         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1108 }
1109
1110 /*
1111   fetch (unlocked, no migration) operation on ctdb
1112  */
1113 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1114                          TDB_DATA key, TDB_DATA *data)
1115 {
1116         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1117                                                         struct db_ctdb_ctx);
1118         NTSTATUS status;
1119         TDB_DATA ctdb_data;
1120
1121         if (ctx->transaction) {
1122                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1123         }
1124
1125         /* try a direct fetch */
1126         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1127
1128         /*
1129          * See if we have a valid record and we are the dmaster. If so, we can
1130          * take the shortcut and just return it.
1131          * we bypass the dmaster check for persistent databases
1132          */
1133         if ((ctdb_data.dptr != NULL) &&
1134             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1135             (db->persistent ||
1136              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1137                 /* we are the dmaster - avoid the ctdb protocol op */
1138
1139                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1140                 if (data->dsize == 0) {
1141                         SAFE_FREE(ctdb_data.dptr);
1142                         data->dptr = NULL;
1143                         return 0;
1144                 }
1145
1146                 data->dptr = (uint8 *)talloc_memdup(
1147                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1148                         data->dsize);
1149
1150                 SAFE_FREE(ctdb_data.dptr);
1151
1152                 if (data->dptr == NULL) {
1153                         return -1;
1154                 }
1155                 return 0;
1156         }
1157
1158         SAFE_FREE(ctdb_data.dptr);
1159
1160         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1161         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1162         if (!NT_STATUS_IS_OK(status)) {
1163                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1164                 return -1;
1165         }
1166
1167         return 0;
1168 }
1169
1170 struct traverse_state {
1171         struct db_context *db;
1172         int (*fn)(struct db_record *rec, void *private_data);
1173         void *private_data;
1174 };
1175
1176 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1177 {
1178         struct traverse_state *state = (struct traverse_state *)private_data;
1179         struct db_record *rec;
1180         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1181         /* we have to give them a locked record to prevent races */
1182         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1183         if (rec && rec->value.dsize > 0) {
1184                 state->fn(rec, state->private_data);
1185         }
1186         talloc_free(tmp_ctx);
1187 }
1188
1189 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1190                                         void *private_data)
1191 {
1192         struct traverse_state *state = (struct traverse_state *)private_data;
1193         struct db_record *rec;
1194         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1195         int ret = 0;
1196         /* we have to give them a locked record to prevent races */
1197         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1198         if (rec && rec->value.dsize > 0) {
1199                 ret = state->fn(rec, state->private_data);
1200         }
1201         talloc_free(tmp_ctx);
1202         return ret;
1203 }
1204
1205 static int db_ctdb_traverse(struct db_context *db,
1206                             int (*fn)(struct db_record *rec,
1207                                       void *private_data),
1208                             void *private_data)
1209 {
1210         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1211                                                         struct db_ctdb_ctx);
1212         struct traverse_state state;
1213
1214         state.db = db;
1215         state.fn = fn;
1216         state.private_data = private_data;
1217
1218         if (db->persistent) {
1219                 /* for persistent databases we don't need to do a ctdb traverse,
1220                    we can do a faster local traverse */
1221                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1222         }
1223
1224
1225         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1226         return 0;
1227 }
1228
1229 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1230 {
1231         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1232 }
1233
1234 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1235 {
1236         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1237 }
1238
1239 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1240 {
1241         struct traverse_state *state = (struct traverse_state *)private_data;
1242         struct db_record rec;
1243         rec.key = key;
1244         rec.value = data;
1245         rec.store = db_ctdb_store_deny;
1246         rec.delete_rec = db_ctdb_delete_deny;
1247         rec.private_data = state->db;
1248         state->fn(&rec, state->private_data);
1249 }
1250
1251 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1252                                         void *private_data)
1253 {
1254         struct traverse_state *state = (struct traverse_state *)private_data;
1255         struct db_record rec;
1256         rec.key = kbuf;
1257         rec.value = dbuf;
1258         rec.store = db_ctdb_store_deny;
1259         rec.delete_rec = db_ctdb_delete_deny;
1260         rec.private_data = state->db;
1261
1262         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1263                 /* a deleted record */
1264                 return 0;
1265         }
1266         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1267         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1268
1269         return state->fn(&rec, state->private_data);
1270 }
1271
1272 static int db_ctdb_traverse_read(struct db_context *db,
1273                                  int (*fn)(struct db_record *rec,
1274                                            void *private_data),
1275                                  void *private_data)
1276 {
1277         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1278                                                         struct db_ctdb_ctx);
1279         struct traverse_state state;
1280
1281         state.db = db;
1282         state.fn = fn;
1283         state.private_data = private_data;
1284
1285         if (db->persistent) {
1286                 /* for persistent databases we don't need to do a ctdb traverse,
1287                    we can do a faster local traverse */
1288                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1289         }
1290
1291         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1292         return 0;
1293 }
1294
1295 static int db_ctdb_get_seqnum(struct db_context *db)
1296 {
1297         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1298                                                         struct db_ctdb_ctx);
1299         return tdb_get_seqnum(ctx->wtdb->tdb);
1300 }
1301
1302 static int db_ctdb_get_flags(struct db_context *db)
1303 {
1304         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1305                                                         struct db_ctdb_ctx);
1306         return tdb_get_flags(ctx->wtdb->tdb);
1307 }
1308
1309 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1310                                 const char *name,
1311                                 int hash_size, int tdb_flags,
1312                                 int open_flags, mode_t mode)
1313 {
1314         struct db_context *result;
1315         struct db_ctdb_ctx *db_ctdb;
1316         char *db_path;
1317
1318         if (!lp_clustering()) {
1319                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1320                 return NULL;
1321         }
1322
1323         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1324                 DEBUG(0, ("talloc failed\n"));
1325                 TALLOC_FREE(result);
1326                 return NULL;
1327         }
1328
1329         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1330                 DEBUG(0, ("talloc failed\n"));
1331                 TALLOC_FREE(result);
1332                 return NULL;
1333         }
1334
1335         db_ctdb->transaction = NULL;
1336         db_ctdb->db = result;
1337
1338         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1339                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1340                 TALLOC_FREE(result);
1341                 return NULL;
1342         }
1343
1344         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1345
1346         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1347
1348         /* only pass through specific flags */
1349         tdb_flags &= TDB_SEQNUM;
1350
1351         /* honor permissions if user has specified O_CREAT */
1352         if (open_flags & O_CREAT) {
1353                 chmod(db_path, mode);
1354         }
1355
1356         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1357         if (db_ctdb->wtdb == NULL) {
1358                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1359                 TALLOC_FREE(result);
1360                 return NULL;
1361         }
1362         talloc_free(db_path);
1363
1364         result->private_data = (void *)db_ctdb;
1365         result->fetch_locked = db_ctdb_fetch_locked;
1366         result->fetch = db_ctdb_fetch;
1367         result->traverse = db_ctdb_traverse;
1368         result->traverse_read = db_ctdb_traverse_read;
1369         result->get_seqnum = db_ctdb_get_seqnum;
1370         result->get_flags = db_ctdb_get_flags;
1371         result->transaction_start = db_ctdb_transaction_start;
1372         result->transaction_commit = db_ctdb_transaction_commit;
1373         result->transaction_cancel = db_ctdb_transaction_cancel;
1374
1375         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1376                  name, db_ctdb->db_id));
1377
1378         return result;
1379 }
1380 #endif