s3:dbwrap_ctdb: add a function db_ctdb_ltdb_fetch()
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /*
30          * we store the reads and writes done under a transaction:
31          * - one list stores both reads and writes (m_all),
32          * - the other just writes (m_write)
33          */
34         struct ctdb_marshall_buffer *m_all;
35         struct ctdb_marshall_buffer *m_write;
36         uint32_t nesting;
37         bool nested_cancel;
38 };
39
40 struct db_ctdb_ctx {
41         struct db_context *db;
42         struct tdb_wrap *wtdb;
43         uint32 db_id;
44         struct db_ctdb_transaction_handle *transaction;
45 };
46
47 struct db_ctdb_rec {
48         struct db_ctdb_ctx *ctdb_ctx;
49         struct ctdb_ltdb_header header;
50 };
51
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53                                                TALLOC_CTX *mem_ctx,
54                                                TDB_DATA key,
55                                                bool persistent);
56
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 {
59         NTSTATUS status;
60         enum TDB_ERROR tret = tdb_error(tdb);
61
62         switch (tret) {
63         case TDB_ERR_EXISTS:
64                 status = NT_STATUS_OBJECT_NAME_COLLISION;
65                 break;
66         case TDB_ERR_NOEXIST:
67                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68                 break;
69         default:
70                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71                 break;
72         }
73
74         return status;
75 }
76
77
78 /**
79  * fetch a record from the tdb, separating out the header
80  * information and returning the body of the record.
81  */
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83                                    TDB_DATA key,
84                                    struct ctdb_ltdb_header *header,
85                                    TALLOC_CTX *mem_ctx,
86                                    TDB_DATA *data)
87 {
88         TDB_DATA rec;
89         NTSTATUS status;
90
91         rec = tdb_fetch(db->wtdb->tdb, key);
92         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93                 status = NT_STATUS_NOT_FOUND;
94                 if (data) {
95                         ZERO_STRUCTP(data);
96                 }
97                 if (header) {
98                         header->dmaster = (uint32_t)-1;
99                         header->rsn = 0;
100                 }
101                 goto done;
102         }
103
104         if (header) {
105                 *header = *(struct ctdb_ltdb_header *)rec.dptr;
106         }
107
108         if (data) {
109                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110                 if (data->dsize == 0) {
111                         data->dptr = NULL;
112                 } else {
113                         data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114                                         rec.dptr
115                                          + sizeof(struct ctdb_ltdb_header),
116                                         data->dsize);
117                         if (data->dptr == NULL) {
118                                 status = NT_STATUS_NO_MEMORY;
119                                 goto done;
120                         }
121                 }
122         }
123
124         status = NT_STATUS_OK;
125
126 done:
127         SAFE_FREE(rec.dptr);
128         return status;
129 }
130
131 /*
132  * Store a record together with the ctdb record header
133  * in the local copy of the database.
134  */
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136                                    TDB_DATA key,
137                                    struct ctdb_ltdb_header *header,
138                                    TDB_DATA data)
139 {
140         TALLOC_CTX *tmp_ctx = talloc_stackframe();
141         TDB_DATA rec;
142         int ret;
143
144         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
146
147         if (rec.dptr == NULL) {
148                 talloc_free(tmp_ctx);
149                 return NT_STATUS_NO_MEMORY;
150         }
151
152         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
154
155         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
156
157         talloc_free(tmp_ctx);
158
159         return (ret == 0) ? NT_STATUS_OK
160                           : tdb_error_to_ntstatus(db->wtdb->tdb);
161
162 }
163
164 /*
165   form a ctdb_rec_data record from a key/data pair
166
167   note that header may be NULL. If not NULL then it is included in the data portion
168   of the record
169  */
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
171                                                   TDB_DATA key, 
172                                                   struct ctdb_ltdb_header *header,
173                                                   TDB_DATA data)
174 {
175         size_t length;
176         struct ctdb_rec_data *d;
177
178         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
179                 data.dsize + (header?sizeof(*header):0);
180         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181         if (d == NULL) {
182                 return NULL;
183         }
184         d->length = length;
185         d->reqid = reqid;
186         d->keylen = key.dsize;
187         memcpy(&d->data[0], key.dptr, key.dsize);
188         if (header) {
189                 d->datalen = data.dsize + sizeof(*header);
190                 memcpy(&d->data[key.dsize], header, sizeof(*header));
191                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192         } else {
193                 d->datalen = data.dsize;
194                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
195         }
196         return d;
197 }
198
199
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
202                                                struct ctdb_marshall_buffer *m,
203                                                uint64_t db_id,
204                                                uint32_t reqid,
205                                                TDB_DATA key,
206                                                struct ctdb_ltdb_header *header,
207                                                TDB_DATA data)
208 {
209         struct ctdb_rec_data *r;
210         size_t m_size, r_size;
211         struct ctdb_marshall_buffer *m2 = NULL;
212
213         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214         if (r == NULL) {
215                 talloc_free(m);
216                 return NULL;
217         }
218
219         if (m == NULL) {
220                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222                 if (m == NULL) {
223                         goto done;
224                 }
225                 m->db_id = db_id;
226         }
227
228         m_size = talloc_get_size(m);
229         r_size = talloc_get_size(r);
230
231         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232                 mem_ctx, m,  m_size + r_size);
233         if (m2 == NULL) {
234                 talloc_free(m);
235                 goto done;
236         }
237
238         memcpy(m_size + (uint8_t *)m2, r, r_size);
239
240         m2->count++;
241
242 done:
243         talloc_free(r);
244         return m2;
245 }
246
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
249 {
250         TDB_DATA data;
251         data.dptr = (uint8_t *)m;
252         data.dsize = talloc_get_size(m);
253         return data;
254 }
255
256 /* 
257    loop over a marshalling buffer 
258
259      - pass r==NULL to start
260      - loop the number of times indicated by m->count
261 */
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263                                                      uint32_t *reqid,
264                                                      struct ctdb_ltdb_header *header,
265                                                      TDB_DATA *key, TDB_DATA *data)
266 {
267         if (r == NULL) {
268                 r = (struct ctdb_rec_data *)&m->data[0];
269         } else {
270                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
271         }
272
273         if (reqid != NULL) {
274                 *reqid = r->reqid;
275         }
276
277         if (key != NULL) {
278                 key->dptr   = &r->data[0];
279                 key->dsize  = r->keylen;
280         }
281         if (data != NULL) {
282                 data->dptr  = &r->data[r->keylen];
283                 data->dsize = r->datalen;
284                 if (header != NULL) {
285                         data->dptr += sizeof(*header);
286                         data->dsize -= sizeof(*header);
287                 }
288         }
289
290         if (header != NULL) {
291                 if (r->datalen < sizeof(*header)) {
292                         return NULL;
293                 }
294                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
295         }
296
297         return r;
298 }
299
300
301
302 /**
303  * CTDB transaction destructor
304  */
305 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
306 {
307         tdb_transaction_cancel(h->ctx->wtdb->tdb);
308         return 0;
309 }
310
311 /**
312  * start a transaction on a ctdb database:
313  * - lock the transaction lock key
314  * - start the tdb transaction
315  */
316 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
317 {
318         struct db_record *rh;
319         TDB_DATA key;
320         TALLOC_CTX *tmp_ctx;
321         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
322         int ret;
323         struct db_ctdb_ctx *ctx = h->ctx;
324         TDB_DATA data;
325
326         key.dptr = (uint8_t *)discard_const(keyname);
327         key.dsize = strlen(keyname);
328
329 again:
330         tmp_ctx = talloc_new(h);
331
332         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
333         if (rh == NULL) {
334                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
335                 talloc_free(tmp_ctx);
336                 return -1;
337         }
338         talloc_free(rh);
339
340         ret = tdb_transaction_start(ctx->wtdb->tdb);
341         if (ret != 0) {
342                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
343                 talloc_free(tmp_ctx);
344                 return -1;
345         }
346
347         data = tdb_fetch(ctx->wtdb->tdb, key);
348         if ((data.dptr == NULL) ||
349             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
350             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
351                 SAFE_FREE(data.dptr);
352                 tdb_transaction_cancel(ctx->wtdb->tdb);
353                 talloc_free(tmp_ctx);
354                 goto again;
355         }
356
357         SAFE_FREE(data.dptr);
358         talloc_free(tmp_ctx);
359
360         return 0;
361 }
362
363
364 /**
365  * CTDB dbwrap API: transaction_start function
366  * starts a transaction on a persistent database
367  */
368 static int db_ctdb_transaction_start(struct db_context *db)
369 {
370         struct db_ctdb_transaction_handle *h;
371         int ret;
372         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
373                                                         struct db_ctdb_ctx);
374
375         if (!db->persistent) {
376                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
377                          ctx->db_id));
378                 return -1;
379         }
380
381         if (ctx->transaction) {
382                 ctx->transaction->nesting++;
383                 return 0;
384         }
385
386         h = talloc_zero(db, struct db_ctdb_transaction_handle);
387         if (h == NULL) {
388                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
389                 return -1;
390         }
391
392         h->ctx = ctx;
393
394         ret = db_ctdb_transaction_fetch_start(h);
395         if (ret != 0) {
396                 talloc_free(h);
397                 return -1;
398         }
399
400         talloc_set_destructor(h, db_ctdb_transaction_destructor);
401
402         ctx->transaction = h;
403
404         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
405
406         return 0;
407 }
408
409
410
411 /*
412   fetch a record inside a transaction
413  */
414 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
415                                      TALLOC_CTX *mem_ctx, 
416                                      TDB_DATA key, TDB_DATA *data)
417 {
418         struct db_ctdb_transaction_handle *h = db->transaction;
419
420         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
421
422         if (data->dptr != NULL) {
423                 uint8_t *oldptr = (uint8_t *)data->dptr;
424                 data->dsize -= sizeof(struct ctdb_ltdb_header);
425                 if (data->dsize == 0) {
426                         data->dptr = NULL;
427                 } else {
428                         data->dptr = (uint8 *)
429                                 talloc_memdup(
430                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
431                                         data->dsize);
432                 }
433                 SAFE_FREE(oldptr);
434                 if (data->dptr == NULL && data->dsize != 0) {
435                         return -1;
436                 }
437         }
438
439         if (!h->in_replay) {
440                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
441                 if (h->m_all == NULL) {
442                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
443                         data->dsize = 0;
444                         talloc_free(data->dptr);
445                         return -1;
446                 }
447         }
448
449         return 0;
450 }
451
452
453 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
454 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
455
456 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
457                                                           TALLOC_CTX *mem_ctx,
458                                                           TDB_DATA key)
459 {
460         struct db_record *result;
461         TDB_DATA ctdb_data;
462
463         if (!(result = talloc(mem_ctx, struct db_record))) {
464                 DEBUG(0, ("talloc failed\n"));
465                 return NULL;
466         }
467
468         result->private_data = ctx->transaction;
469
470         result->key.dsize = key.dsize;
471         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
472         if (result->key.dptr == NULL) {
473                 DEBUG(0, ("talloc failed\n"));
474                 TALLOC_FREE(result);
475                 return NULL;
476         }
477
478         result->store = db_ctdb_store_transaction;
479         result->delete_rec = db_ctdb_delete_transaction;
480
481         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
482         if (ctdb_data.dptr == NULL) {
483                 /* create the record */
484                 result->value = tdb_null;
485                 return result;
486         }
487
488         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
489         result->value.dptr = NULL;
490
491         if ((result->value.dsize != 0)
492             && !(result->value.dptr = (uint8 *)talloc_memdup(
493                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
494                          result->value.dsize))) {
495                 DEBUG(0, ("talloc failed\n"));
496                 TALLOC_FREE(result);
497         }
498
499         SAFE_FREE(ctdb_data.dptr);
500
501         return result;
502 }
503
504 static int db_ctdb_record_destructor(struct db_record **recp)
505 {
506         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
507         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
508                 rec->private_data, struct db_ctdb_transaction_handle);
509         int ret = h->ctx->db->transaction_commit(h->ctx->db);
510         if (ret != 0) {
511                 DEBUG(0,(__location__ " transaction_commit failed\n"));
512         }
513         return 0;
514 }
515
516 /*
517   auto-create a transaction for persistent databases
518  */
519 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
520                                                          TALLOC_CTX *mem_ctx,
521                                                          TDB_DATA key)
522 {
523         int res;
524         struct db_record *rec, **recp;
525
526         res = db_ctdb_transaction_start(ctx->db);
527         if (res == -1) {
528                 return NULL;
529         }
530
531         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
532         if (rec == NULL) {
533                 ctx->db->transaction_cancel(ctx->db);           
534                 return NULL;
535         }
536
537         /* destroy this transaction when we release the lock */
538         recp = talloc(rec, struct db_record *);
539         if (recp == NULL) {
540                 ctx->db->transaction_cancel(ctx->db);
541                 talloc_free(rec);
542                 return NULL;
543         }
544         *recp = rec;
545         talloc_set_destructor(recp, db_ctdb_record_destructor);
546         return rec;
547 }
548
549
550 /*
551   stores a record inside a transaction
552  */
553 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
554                                      TDB_DATA key, TDB_DATA data)
555 {
556         TALLOC_CTX *tmp_ctx = talloc_new(h);
557         int ret;
558         TDB_DATA rec;
559         struct ctdb_ltdb_header header;
560         NTSTATUS status;
561
562         /* we need the header so we can update the RSN */
563         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
564         if (rec.dptr == NULL) {
565                 /* the record doesn't exist - create one with us as dmaster.
566                    This is only safe because we are in a transaction and this
567                    is a persistent database */
568                 ZERO_STRUCT(header);
569         } else {
570                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
571                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
572                 /* a special case, we are writing the same data that is there now */
573                 if (data.dsize == rec.dsize &&
574                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
575                         SAFE_FREE(rec.dptr);
576                         talloc_free(tmp_ctx);
577                         return 0;
578                 }
579                 SAFE_FREE(rec.dptr);
580         }
581
582         header.dmaster = get_my_vnn();
583         header.rsn++;
584
585         if (!h->in_replay) {
586                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
587                 if (h->m_all == NULL) {
588                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
589                         talloc_free(tmp_ctx);
590                         return -1;
591                 }
592         }
593
594         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
595         if (h->m_write == NULL) {
596                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
597                 talloc_free(tmp_ctx);
598                 return -1;
599         }
600
601         status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
602         if (NT_STATUS_IS_OK(status)) {
603                 ret = 0;
604         } else {
605                 ret = -1;
606         }
607
608         talloc_free(tmp_ctx);
609
610         return ret;
611 }
612
613
614 /* 
615    a record store inside a transaction
616  */
617 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
618 {
619         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
620                 rec->private_data, struct db_ctdb_transaction_handle);
621         int ret;
622
623         ret = db_ctdb_transaction_store(h, rec->key, data);
624         if (ret != 0) {
625                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
626         }
627         return NT_STATUS_OK;
628 }
629
630 /* 
631    a record delete inside a transaction
632  */
633 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
634 {
635         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
636                 rec->private_data, struct db_ctdb_transaction_handle);
637         int ret;
638
639         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
640         if (ret != 0) {
641                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
642         }
643         return NT_STATUS_OK;
644 }
645
646
647 /*
648   replay a transaction
649  */
650 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
651 {
652         int ret, i;
653         struct ctdb_rec_data *rec = NULL;
654
655         h->in_replay = true;
656         talloc_free(h->m_write);
657         h->m_write = NULL;
658
659         ret = db_ctdb_transaction_fetch_start(h);
660         if (ret != 0) {
661                 return ret;
662         }
663
664         for (i=0;i<h->m_all->count;i++) {
665                 TDB_DATA key, data;
666
667                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
668                 if (rec == NULL) {
669                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
670                         goto failed;
671                 }
672
673                 if (rec->reqid == 0) {
674                         /* its a store */
675                         if (db_ctdb_transaction_store(h, key, data) != 0) {
676                                 goto failed;
677                         }
678                 } else {
679                         TDB_DATA data2;
680                         TALLOC_CTX *tmp_ctx = talloc_new(h);
681
682                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
683                                 talloc_free(tmp_ctx);
684                                 goto failed;
685                         }
686                         if (data2.dsize != data.dsize ||
687                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
688                                 /* the record has changed on us - we have to give up */
689                                 talloc_free(tmp_ctx);
690                                 goto failed;
691                         }
692                         talloc_free(tmp_ctx);
693                 }
694         }
695
696         return 0;
697
698 failed:
699         tdb_transaction_cancel(h->ctx->wtdb->tdb);
700         return -1;
701 }
702
703
704 /*
705   commit a transaction
706  */
707 static int db_ctdb_transaction_commit(struct db_context *db)
708 {
709         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
710                                                         struct db_ctdb_ctx);
711         NTSTATUS rets;
712         int ret;
713         int status;
714         int retries = 0;
715         struct db_ctdb_transaction_handle *h = ctx->transaction;
716         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
717
718         if (h == NULL) {
719                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
720                 return -1;
721         }
722
723         if (h->nested_cancel) {
724                 db->transaction_cancel(db);
725                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
726                 return -1;
727         }
728
729         if (h->nesting != 0) {
730                 h->nesting--;
731                 return 0;
732         }
733
734         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
735
736         talloc_set_destructor(h, NULL);
737
738         /* our commit strategy is quite complex.
739
740            - we first try to commit the changes to all other nodes
741
742            - if that works, then we commit locally and we are done
743
744            - if a commit on another node fails, then we need to cancel
745              the transaction, then restart the transaction (thus
746              opening a window of time for a pending recovery to
747              complete), then replay the transaction, checking all the
748              reads and writes (checking that reads give the same data,
749              and writes succeed). Then we retry the transaction to the
750              other nodes
751         */
752
753 again:
754         if (h->m_write == NULL) {
755                 /* no changes were made, potentially after a retry */
756                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
757                 talloc_free(h);
758                 ctx->transaction = NULL;
759                 return 0;
760         }
761
762         /* tell ctdbd to commit to the other nodes */
763         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
764                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
765                                    h->ctx->db_id, 0,
766                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
767         if (!NT_STATUS_IS_OK(rets) || status != 0) {
768                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
769                 sleep(1);
770
771                 if (!NT_STATUS_IS_OK(rets)) {
772                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
773                 } else {
774                         /* work out what error code we will give if we 
775                            have to fail the operation */
776                         switch ((enum ctdb_trans2_commit_error)status) {
777                         case CTDB_TRANS2_COMMIT_SUCCESS:
778                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
779                         case CTDB_TRANS2_COMMIT_TIMEOUT:
780                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
781                                 break;
782                         case CTDB_TRANS2_COMMIT_ALLFAIL:
783                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
784                                 break;
785                         }
786                 }
787
788                 if (++retries == 5) {
789                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
790                                  h->ctx->db_id, retries, (unsigned)failure_control));
791                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
792                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
793                                             tdb_null, NULL, NULL, NULL);
794                         h->ctx->transaction = NULL;
795                         talloc_free(h);
796                         ctx->transaction = NULL;
797                         return -1;                      
798                 }
799
800                 if (ctdb_replay_transaction(h) != 0) {
801                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
802                                  (unsigned)failure_control));
803                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
804                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
805                                             tdb_null, NULL, NULL, NULL);
806                         h->ctx->transaction = NULL;
807                         talloc_free(h);
808                         ctx->transaction = NULL;
809                         return -1;
810                 }
811                 goto again;
812         } else {
813                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
814         }
815
816         /* do the real commit locally */
817         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
818         if (ret != 0) {
819                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
820                          (unsigned)failure_control));
821                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
822                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
823                 h->ctx->transaction = NULL;
824                 talloc_free(h);
825                 return ret;
826         }
827
828         /* tell ctdbd that we are finished with our local commit */
829         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
830                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
831                             tdb_null, NULL, NULL, NULL);
832         h->ctx->transaction = NULL;
833         talloc_free(h);
834         return 0;
835 }
836
837
838 /*
839   cancel a transaction
840  */
841 static int db_ctdb_transaction_cancel(struct db_context *db)
842 {
843         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
844                                                         struct db_ctdb_ctx);
845         struct db_ctdb_transaction_handle *h = ctx->transaction;
846
847         if (h == NULL) {
848                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
849                 return -1;
850         }
851
852         if (h->nesting != 0) {
853                 h->nesting--;
854                 h->nested_cancel = true;
855                 return 0;
856         }
857
858         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
859
860         ctx->transaction = NULL;
861         talloc_free(h);
862         return 0;
863 }
864
865
866 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
867 {
868         struct db_ctdb_rec *crec = talloc_get_type_abort(
869                 rec->private_data, struct db_ctdb_rec);
870
871         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
872 }
873
874
875
876 static NTSTATUS db_ctdb_delete(struct db_record *rec)
877 {
878         TDB_DATA data;
879
880         /*
881          * We have to store the header with empty data. TODO: Fix the
882          * tdb-level cleanup
883          */
884
885         ZERO_STRUCT(data);
886
887         return db_ctdb_store(rec, data, 0);
888
889 }
890
891 static int db_ctdb_record_destr(struct db_record* data)
892 {
893         struct db_ctdb_rec *crec = talloc_get_type_abort(
894                 data->private_data, struct db_ctdb_rec);
895
896         DEBUG(10, (DEBUGLEVEL > 10
897                    ? "Unlocking db %u key %s\n"
898                    : "Unlocking db %u key %.20s\n",
899                    (int)crec->ctdb_ctx->db_id,
900                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
901                               data->key.dsize)));
902
903         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
904                 DEBUG(0, ("tdb_chainunlock failed\n"));
905                 return -1;
906         }
907
908         return 0;
909 }
910
911 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
912                                                TALLOC_CTX *mem_ctx,
913                                                TDB_DATA key,
914                                                bool persistent)
915 {
916         struct db_record *result;
917         struct db_ctdb_rec *crec;
918         NTSTATUS status;
919         TDB_DATA ctdb_data;
920         int migrate_attempts = 0;
921
922         if (!(result = talloc(mem_ctx, struct db_record))) {
923                 DEBUG(0, ("talloc failed\n"));
924                 return NULL;
925         }
926
927         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
928                 DEBUG(0, ("talloc failed\n"));
929                 TALLOC_FREE(result);
930                 return NULL;
931         }
932
933         result->private_data = (void *)crec;
934         crec->ctdb_ctx = ctx;
935
936         result->key.dsize = key.dsize;
937         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
938         if (result->key.dptr == NULL) {
939                 DEBUG(0, ("talloc failed\n"));
940                 TALLOC_FREE(result);
941                 return NULL;
942         }
943
944         /*
945          * Do a blocking lock on the record
946          */
947 again:
948
949         if (DEBUGLEVEL >= 10) {
950                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
951                 DEBUG(10, (DEBUGLEVEL > 10
952                            ? "Locking db %u key %s\n"
953                            : "Locking db %u key %.20s\n",
954                            (int)crec->ctdb_ctx->db_id, keystr));
955                 TALLOC_FREE(keystr);
956         }
957
958         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
959                 DEBUG(3, ("tdb_chainlock failed\n"));
960                 TALLOC_FREE(result);
961                 return NULL;
962         }
963
964         result->store = db_ctdb_store;
965         result->delete_rec = db_ctdb_delete;
966         talloc_set_destructor(result, db_ctdb_record_destr);
967
968         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
969
970         /*
971          * See if we have a valid record and we are the dmaster. If so, we can
972          * take the shortcut and just return it.
973          */
974
975         if ((ctdb_data.dptr == NULL) ||
976             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
977             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
978 #if 0
979             || (random() % 2 != 0)
980 #endif
981 ) {
982                 SAFE_FREE(ctdb_data.dptr);
983                 tdb_chainunlock(ctx->wtdb->tdb, key);
984                 talloc_set_destructor(result, NULL);
985
986                 migrate_attempts += 1;
987
988                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
989                            ctdb_data.dptr, ctdb_data.dptr ?
990                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
991                            get_my_vnn()));
992
993                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
994                 if (!NT_STATUS_IS_OK(status)) {
995                         DEBUG(5, ("ctdb_migrate failed: %s\n",
996                                   nt_errstr(status)));
997                         TALLOC_FREE(result);
998                         return NULL;
999                 }
1000                 /* now its migrated, try again */
1001                 goto again;
1002         }
1003
1004         if (migrate_attempts > 10) {
1005                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1006                           migrate_attempts));
1007         }
1008
1009         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1010
1011         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1012         result->value.dptr = NULL;
1013
1014         if ((result->value.dsize != 0)
1015             && !(result->value.dptr = (uint8 *)talloc_memdup(
1016                          result, ctdb_data.dptr + sizeof(crec->header),
1017                          result->value.dsize))) {
1018                 DEBUG(0, ("talloc failed\n"));
1019                 TALLOC_FREE(result);
1020         }
1021
1022         SAFE_FREE(ctdb_data.dptr);
1023
1024         return result;
1025 }
1026
1027 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1028                                               TALLOC_CTX *mem_ctx,
1029                                               TDB_DATA key)
1030 {
1031         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1032                                                         struct db_ctdb_ctx);
1033
1034         if (ctx->transaction != NULL) {
1035                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1036         }
1037
1038         if (db->persistent) {
1039                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1040         }
1041
1042         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1043 }
1044
1045 /*
1046   fetch (unlocked, no migration) operation on ctdb
1047  */
1048 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1049                          TDB_DATA key, TDB_DATA *data)
1050 {
1051         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1052                                                         struct db_ctdb_ctx);
1053         NTSTATUS status;
1054         TDB_DATA ctdb_data;
1055
1056         if (ctx->transaction) {
1057                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1058         }
1059
1060         /* try a direct fetch */
1061         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1062
1063         /*
1064          * See if we have a valid record and we are the dmaster. If so, we can
1065          * take the shortcut and just return it.
1066          * we bypass the dmaster check for persistent databases
1067          */
1068         if ((ctdb_data.dptr != NULL) &&
1069             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1070             (db->persistent ||
1071              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1072                 /* we are the dmaster - avoid the ctdb protocol op */
1073
1074                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1075                 if (data->dsize == 0) {
1076                         SAFE_FREE(ctdb_data.dptr);
1077                         data->dptr = NULL;
1078                         return 0;
1079                 }
1080
1081                 data->dptr = (uint8 *)talloc_memdup(
1082                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1083                         data->dsize);
1084
1085                 SAFE_FREE(ctdb_data.dptr);
1086
1087                 if (data->dptr == NULL) {
1088                         return -1;
1089                 }
1090                 return 0;
1091         }
1092
1093         SAFE_FREE(ctdb_data.dptr);
1094
1095         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1096         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1097         if (!NT_STATUS_IS_OK(status)) {
1098                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1099                 return -1;
1100         }
1101
1102         return 0;
1103 }
1104
1105 struct traverse_state {
1106         struct db_context *db;
1107         int (*fn)(struct db_record *rec, void *private_data);
1108         void *private_data;
1109 };
1110
1111 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1112 {
1113         struct traverse_state *state = (struct traverse_state *)private_data;
1114         struct db_record *rec;
1115         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1116         /* we have to give them a locked record to prevent races */
1117         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1118         if (rec && rec->value.dsize > 0) {
1119                 state->fn(rec, state->private_data);
1120         }
1121         talloc_free(tmp_ctx);
1122 }
1123
1124 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1125                                         void *private_data)
1126 {
1127         struct traverse_state *state = (struct traverse_state *)private_data;
1128         struct db_record *rec;
1129         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1130         int ret = 0;
1131         /* we have to give them a locked record to prevent races */
1132         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1133         if (rec && rec->value.dsize > 0) {
1134                 ret = state->fn(rec, state->private_data);
1135         }
1136         talloc_free(tmp_ctx);
1137         return ret;
1138 }
1139
1140 static int db_ctdb_traverse(struct db_context *db,
1141                             int (*fn)(struct db_record *rec,
1142                                       void *private_data),
1143                             void *private_data)
1144 {
1145         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1146                                                         struct db_ctdb_ctx);
1147         struct traverse_state state;
1148
1149         state.db = db;
1150         state.fn = fn;
1151         state.private_data = private_data;
1152
1153         if (db->persistent) {
1154                 /* for persistent databases we don't need to do a ctdb traverse,
1155                    we can do a faster local traverse */
1156                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1157         }
1158
1159
1160         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1161         return 0;
1162 }
1163
1164 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1165 {
1166         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1167 }
1168
1169 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1170 {
1171         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1172 }
1173
1174 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1175 {
1176         struct traverse_state *state = (struct traverse_state *)private_data;
1177         struct db_record rec;
1178         rec.key = key;
1179         rec.value = data;
1180         rec.store = db_ctdb_store_deny;
1181         rec.delete_rec = db_ctdb_delete_deny;
1182         rec.private_data = state->db;
1183         state->fn(&rec, state->private_data);
1184 }
1185
1186 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1187                                         void *private_data)
1188 {
1189         struct traverse_state *state = (struct traverse_state *)private_data;
1190         struct db_record rec;
1191         rec.key = kbuf;
1192         rec.value = dbuf;
1193         rec.store = db_ctdb_store_deny;
1194         rec.delete_rec = db_ctdb_delete_deny;
1195         rec.private_data = state->db;
1196
1197         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1198                 /* a deleted record */
1199                 return 0;
1200         }
1201         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1202         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1203
1204         return state->fn(&rec, state->private_data);
1205 }
1206
1207 static int db_ctdb_traverse_read(struct db_context *db,
1208                                  int (*fn)(struct db_record *rec,
1209                                            void *private_data),
1210                                  void *private_data)
1211 {
1212         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1213                                                         struct db_ctdb_ctx);
1214         struct traverse_state state;
1215
1216         state.db = db;
1217         state.fn = fn;
1218         state.private_data = private_data;
1219
1220         if (db->persistent) {
1221                 /* for persistent databases we don't need to do a ctdb traverse,
1222                    we can do a faster local traverse */
1223                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1224         }
1225
1226         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1227         return 0;
1228 }
1229
1230 static int db_ctdb_get_seqnum(struct db_context *db)
1231 {
1232         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1233                                                         struct db_ctdb_ctx);
1234         return tdb_get_seqnum(ctx->wtdb->tdb);
1235 }
1236
1237 static int db_ctdb_get_flags(struct db_context *db)
1238 {
1239         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1240                                                         struct db_ctdb_ctx);
1241         return tdb_get_flags(ctx->wtdb->tdb);
1242 }
1243
1244 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1245                                 const char *name,
1246                                 int hash_size, int tdb_flags,
1247                                 int open_flags, mode_t mode)
1248 {
1249         struct db_context *result;
1250         struct db_ctdb_ctx *db_ctdb;
1251         char *db_path;
1252
1253         if (!lp_clustering()) {
1254                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1255                 return NULL;
1256         }
1257
1258         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1259                 DEBUG(0, ("talloc failed\n"));
1260                 TALLOC_FREE(result);
1261                 return NULL;
1262         }
1263
1264         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1265                 DEBUG(0, ("talloc failed\n"));
1266                 TALLOC_FREE(result);
1267                 return NULL;
1268         }
1269
1270         db_ctdb->transaction = NULL;
1271         db_ctdb->db = result;
1272
1273         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1274                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1275                 TALLOC_FREE(result);
1276                 return NULL;
1277         }
1278
1279         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1280
1281         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1282
1283         /* only pass through specific flags */
1284         tdb_flags &= TDB_SEQNUM;
1285
1286         /* honor permissions if user has specified O_CREAT */
1287         if (open_flags & O_CREAT) {
1288                 chmod(db_path, mode);
1289         }
1290
1291         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1292         if (db_ctdb->wtdb == NULL) {
1293                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1294                 TALLOC_FREE(result);
1295                 return NULL;
1296         }
1297         talloc_free(db_path);
1298
1299         result->private_data = (void *)db_ctdb;
1300         result->fetch_locked = db_ctdb_fetch_locked;
1301         result->fetch = db_ctdb_fetch;
1302         result->traverse = db_ctdb_traverse;
1303         result->traverse_read = db_ctdb_traverse_read;
1304         result->get_seqnum = db_ctdb_get_seqnum;
1305         result->get_flags = db_ctdb_get_flags;
1306         result->transaction_start = db_ctdb_transaction_start;
1307         result->transaction_commit = db_ctdb_transaction_commit;
1308         result->transaction_cancel = db_ctdb_transaction_cancel;
1309
1310         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1311                  name, db_ctdb->db_id));
1312
1313         return result;
1314 }
1315 #endif