s3:dbwrap_ctdb: fix a race in starting concurrent transactions on a single node
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /*
30          * we store the reads and writes done under a transaction:
31          * - one list stores both reads and writes (m_all),
32          * - the other just writes (m_write)
33          */
34         struct ctdb_marshall_buffer *m_all;
35         struct ctdb_marshall_buffer *m_write;
36         uint32_t nesting;
37         bool nested_cancel;
38 };
39
40 struct db_ctdb_ctx {
41         struct db_context *db;
42         struct tdb_wrap *wtdb;
43         uint32 db_id;
44         struct db_ctdb_transaction_handle *transaction;
45 };
46
47 struct db_ctdb_rec {
48         struct db_ctdb_ctx *ctdb_ctx;
49         struct ctdb_ltdb_header header;
50 };
51
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53                                                TALLOC_CTX *mem_ctx,
54                                                TDB_DATA key,
55                                                bool persistent);
56
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 {
59         NTSTATUS status;
60         enum TDB_ERROR tret = tdb_error(tdb);
61
62         switch (tret) {
63         case TDB_ERR_EXISTS:
64                 status = NT_STATUS_OBJECT_NAME_COLLISION;
65                 break;
66         case TDB_ERR_NOEXIST:
67                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68                 break;
69         default:
70                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71                 break;
72         }
73
74         return status;
75 }
76
77
78 /**
79  * fetch a record from the tdb, separating out the header
80  * information and returning the body of the record.
81  */
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83                                    TDB_DATA key,
84                                    struct ctdb_ltdb_header *header,
85                                    TALLOC_CTX *mem_ctx,
86                                    TDB_DATA *data)
87 {
88         TDB_DATA rec;
89         NTSTATUS status;
90
91         rec = tdb_fetch(db->wtdb->tdb, key);
92         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93                 status = NT_STATUS_NOT_FOUND;
94                 if (data) {
95                         ZERO_STRUCTP(data);
96                 }
97                 if (header) {
98                         header->dmaster = (uint32_t)-1;
99                         header->rsn = 0;
100                 }
101                 goto done;
102         }
103
104         if (header) {
105                 *header = *(struct ctdb_ltdb_header *)rec.dptr;
106         }
107
108         if (data) {
109                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110                 if (data->dsize == 0) {
111                         data->dptr = NULL;
112                 } else {
113                         data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114                                         rec.dptr
115                                          + sizeof(struct ctdb_ltdb_header),
116                                         data->dsize);
117                         if (data->dptr == NULL) {
118                                 status = NT_STATUS_NO_MEMORY;
119                                 goto done;
120                         }
121                 }
122         }
123
124         status = NT_STATUS_OK;
125
126 done:
127         SAFE_FREE(rec.dptr);
128         return status;
129 }
130
131 /*
132  * Store a record together with the ctdb record header
133  * in the local copy of the database.
134  */
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136                                    TDB_DATA key,
137                                    struct ctdb_ltdb_header *header,
138                                    TDB_DATA data)
139 {
140         TALLOC_CTX *tmp_ctx = talloc_stackframe();
141         TDB_DATA rec;
142         int ret;
143
144         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
146
147         if (rec.dptr == NULL) {
148                 talloc_free(tmp_ctx);
149                 return NT_STATUS_NO_MEMORY;
150         }
151
152         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
154
155         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
156
157         talloc_free(tmp_ctx);
158
159         return (ret == 0) ? NT_STATUS_OK
160                           : tdb_error_to_ntstatus(db->wtdb->tdb);
161
162 }
163
164 /*
165   form a ctdb_rec_data record from a key/data pair
166
167   note that header may be NULL. If not NULL then it is included in the data portion
168   of the record
169  */
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
171                                                   TDB_DATA key, 
172                                                   struct ctdb_ltdb_header *header,
173                                                   TDB_DATA data)
174 {
175         size_t length;
176         struct ctdb_rec_data *d;
177
178         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
179                 data.dsize + (header?sizeof(*header):0);
180         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181         if (d == NULL) {
182                 return NULL;
183         }
184         d->length = length;
185         d->reqid = reqid;
186         d->keylen = key.dsize;
187         memcpy(&d->data[0], key.dptr, key.dsize);
188         if (header) {
189                 d->datalen = data.dsize + sizeof(*header);
190                 memcpy(&d->data[key.dsize], header, sizeof(*header));
191                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192         } else {
193                 d->datalen = data.dsize;
194                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
195         }
196         return d;
197 }
198
199
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
202                                                struct ctdb_marshall_buffer *m,
203                                                uint64_t db_id,
204                                                uint32_t reqid,
205                                                TDB_DATA key,
206                                                struct ctdb_ltdb_header *header,
207                                                TDB_DATA data)
208 {
209         struct ctdb_rec_data *r;
210         size_t m_size, r_size;
211         struct ctdb_marshall_buffer *m2 = NULL;
212
213         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214         if (r == NULL) {
215                 talloc_free(m);
216                 return NULL;
217         }
218
219         if (m == NULL) {
220                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222                 if (m == NULL) {
223                         goto done;
224                 }
225                 m->db_id = db_id;
226         }
227
228         m_size = talloc_get_size(m);
229         r_size = talloc_get_size(r);
230
231         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232                 mem_ctx, m,  m_size + r_size);
233         if (m2 == NULL) {
234                 talloc_free(m);
235                 goto done;
236         }
237
238         memcpy(m_size + (uint8_t *)m2, r, r_size);
239
240         m2->count++;
241
242 done:
243         talloc_free(r);
244         return m2;
245 }
246
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
249 {
250         TDB_DATA data;
251         data.dptr = (uint8_t *)m;
252         data.dsize = talloc_get_size(m);
253         return data;
254 }
255
256 /* 
257    loop over a marshalling buffer 
258
259      - pass r==NULL to start
260      - loop the number of times indicated by m->count
261 */
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263                                                      uint32_t *reqid,
264                                                      struct ctdb_ltdb_header *header,
265                                                      TDB_DATA *key, TDB_DATA *data)
266 {
267         if (r == NULL) {
268                 r = (struct ctdb_rec_data *)&m->data[0];
269         } else {
270                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
271         }
272
273         if (reqid != NULL) {
274                 *reqid = r->reqid;
275         }
276
277         if (key != NULL) {
278                 key->dptr   = &r->data[0];
279                 key->dsize  = r->keylen;
280         }
281         if (data != NULL) {
282                 data->dptr  = &r->data[r->keylen];
283                 data->dsize = r->datalen;
284                 if (header != NULL) {
285                         data->dptr += sizeof(*header);
286                         data->dsize -= sizeof(*header);
287                 }
288         }
289
290         if (header != NULL) {
291                 if (r->datalen < sizeof(*header)) {
292                         return NULL;
293                 }
294                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
295         }
296
297         return r;
298 }
299
300
301
302 /**
303  * CTDB transaction destructor
304  */
305 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
306 {
307         tdb_transaction_cancel(h->ctx->wtdb->tdb);
308         return 0;
309 }
310
311 /**
312  * start a transaction on a ctdb database:
313  * - lock the transaction lock key
314  * - start the tdb transaction
315  */
316 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
317 {
318         struct db_record *rh;
319         struct db_ctdb_rec *crec;
320         TDB_DATA key;
321         TALLOC_CTX *tmp_ctx;
322         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
323         int ret;
324         struct db_ctdb_ctx *ctx = h->ctx;
325         TDB_DATA data;
326         pid_t pid;
327         NTSTATUS status;
328         struct ctdb_ltdb_header header;
329
330         key.dptr = (uint8_t *)discard_const(keyname);
331         key.dsize = strlen(keyname);
332
333 again:
334         tmp_ctx = talloc_new(h);
335
336         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
337         if (rh == NULL) {
338                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342         crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
343
344         /*
345          * store the pid in the database:
346          * it is not enought that the node is dmaster...
347          */
348         pid = getpid();
349         data.dptr = (unsigned char *)&pid;
350         data.dsize = sizeof(pid_t);
351         status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
352         if (!NT_STATUS_IS_OK(status)) {
353                 DEBUG(0, (__location__ " Failed to store pid in transaction "
354                           "record: %s\n", nt_errstr(status)));
355                 talloc_free(tmp_ctx);
356                 return -1;
357         }
358
359         talloc_free(rh);
360
361         ret = tdb_transaction_start(ctx->wtdb->tdb);
362         if (ret != 0) {
363                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
364                 talloc_free(tmp_ctx);
365                 return -1;
366         }
367
368         status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
369         if (!NT_STATUS_IS_OK(status) || header.dmaster != get_my_vnn()) {
370                 tdb_transaction_cancel(ctx->wtdb->tdb);
371                 talloc_free(tmp_ctx);
372                 goto again;
373         }
374
375         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
376                 tdb_transaction_cancel(ctx->wtdb->tdb);
377                 talloc_free(tmp_ctx);
378                 goto again;
379         }
380
381         talloc_free(tmp_ctx);
382
383         return 0;
384 }
385
386
387 /**
388  * CTDB dbwrap API: transaction_start function
389  * starts a transaction on a persistent database
390  */
391 static int db_ctdb_transaction_start(struct db_context *db)
392 {
393         struct db_ctdb_transaction_handle *h;
394         int ret;
395         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
396                                                         struct db_ctdb_ctx);
397
398         if (!db->persistent) {
399                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
400                          ctx->db_id));
401                 return -1;
402         }
403
404         if (ctx->transaction) {
405                 ctx->transaction->nesting++;
406                 return 0;
407         }
408
409         h = talloc_zero(db, struct db_ctdb_transaction_handle);
410         if (h == NULL) {
411                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
412                 return -1;
413         }
414
415         h->ctx = ctx;
416
417         ret = db_ctdb_transaction_fetch_start(h);
418         if (ret != 0) {
419                 talloc_free(h);
420                 return -1;
421         }
422
423         talloc_set_destructor(h, db_ctdb_transaction_destructor);
424
425         ctx->transaction = h;
426
427         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
428
429         return 0;
430 }
431
432
433
434 /*
435   fetch a record inside a transaction
436  */
437 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
438                                      TALLOC_CTX *mem_ctx, 
439                                      TDB_DATA key, TDB_DATA *data)
440 {
441         struct db_ctdb_transaction_handle *h = db->transaction;
442         NTSTATUS status;
443
444         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
445
446         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
447                 *data = tdb_null;
448         } else if (!NT_STATUS_IS_OK(status)) {
449                 return -1;
450         }
451
452         if (!h->in_replay) {
453                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
454                 if (h->m_all == NULL) {
455                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
456                         data->dsize = 0;
457                         talloc_free(data->dptr);
458                         return -1;
459                 }
460         }
461
462         return 0;
463 }
464
465
466 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
467 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
468
469 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
470                                                           TALLOC_CTX *mem_ctx,
471                                                           TDB_DATA key)
472 {
473         struct db_record *result;
474         TDB_DATA ctdb_data;
475
476         if (!(result = talloc(mem_ctx, struct db_record))) {
477                 DEBUG(0, ("talloc failed\n"));
478                 return NULL;
479         }
480
481         result->private_data = ctx->transaction;
482
483         result->key.dsize = key.dsize;
484         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
485         if (result->key.dptr == NULL) {
486                 DEBUG(0, ("talloc failed\n"));
487                 TALLOC_FREE(result);
488                 return NULL;
489         }
490
491         result->store = db_ctdb_store_transaction;
492         result->delete_rec = db_ctdb_delete_transaction;
493
494         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
495         if (ctdb_data.dptr == NULL) {
496                 /* create the record */
497                 result->value = tdb_null;
498                 return result;
499         }
500
501         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
502         result->value.dptr = NULL;
503
504         if ((result->value.dsize != 0)
505             && !(result->value.dptr = (uint8 *)talloc_memdup(
506                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
507                          result->value.dsize))) {
508                 DEBUG(0, ("talloc failed\n"));
509                 TALLOC_FREE(result);
510         }
511
512         SAFE_FREE(ctdb_data.dptr);
513
514         return result;
515 }
516
517 static int db_ctdb_record_destructor(struct db_record **recp)
518 {
519         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
520         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
521                 rec->private_data, struct db_ctdb_transaction_handle);
522         int ret = h->ctx->db->transaction_commit(h->ctx->db);
523         if (ret != 0) {
524                 DEBUG(0,(__location__ " transaction_commit failed\n"));
525         }
526         return 0;
527 }
528
529 /*
530   auto-create a transaction for persistent databases
531  */
532 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
533                                                          TALLOC_CTX *mem_ctx,
534                                                          TDB_DATA key)
535 {
536         int res;
537         struct db_record *rec, **recp;
538
539         res = db_ctdb_transaction_start(ctx->db);
540         if (res == -1) {
541                 return NULL;
542         }
543
544         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
545         if (rec == NULL) {
546                 ctx->db->transaction_cancel(ctx->db);           
547                 return NULL;
548         }
549
550         /* destroy this transaction when we release the lock */
551         recp = talloc(rec, struct db_record *);
552         if (recp == NULL) {
553                 ctx->db->transaction_cancel(ctx->db);
554                 talloc_free(rec);
555                 return NULL;
556         }
557         *recp = rec;
558         talloc_set_destructor(recp, db_ctdb_record_destructor);
559         return rec;
560 }
561
562
563 /*
564   stores a record inside a transaction
565  */
566 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
567                                      TDB_DATA key, TDB_DATA data)
568 {
569         TALLOC_CTX *tmp_ctx = talloc_new(h);
570         int ret;
571         TDB_DATA rec;
572         struct ctdb_ltdb_header header;
573         NTSTATUS status;
574
575         /* we need the header so we can update the RSN */
576         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
577         if (rec.dptr == NULL) {
578                 /* the record doesn't exist - create one with us as dmaster.
579                    This is only safe because we are in a transaction and this
580                    is a persistent database */
581                 ZERO_STRUCT(header);
582         } else {
583                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
584                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
585                 /* a special case, we are writing the same data that is there now */
586                 if (data.dsize == rec.dsize &&
587                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
588                         SAFE_FREE(rec.dptr);
589                         talloc_free(tmp_ctx);
590                         return 0;
591                 }
592                 SAFE_FREE(rec.dptr);
593         }
594
595         header.dmaster = get_my_vnn();
596         header.rsn++;
597
598         if (!h->in_replay) {
599                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
600                 if (h->m_all == NULL) {
601                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
602                         talloc_free(tmp_ctx);
603                         return -1;
604                 }
605         }
606
607         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
608         if (h->m_write == NULL) {
609                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
610                 talloc_free(tmp_ctx);
611                 return -1;
612         }
613
614         status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
615         if (NT_STATUS_IS_OK(status)) {
616                 ret = 0;
617         } else {
618                 ret = -1;
619         }
620
621         talloc_free(tmp_ctx);
622
623         return ret;
624 }
625
626
627 /* 
628    a record store inside a transaction
629  */
630 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
631 {
632         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
633                 rec->private_data, struct db_ctdb_transaction_handle);
634         int ret;
635
636         ret = db_ctdb_transaction_store(h, rec->key, data);
637         if (ret != 0) {
638                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
639         }
640         return NT_STATUS_OK;
641 }
642
643 /* 
644    a record delete inside a transaction
645  */
646 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
647 {
648         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
649                 rec->private_data, struct db_ctdb_transaction_handle);
650         int ret;
651
652         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
653         if (ret != 0) {
654                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
655         }
656         return NT_STATUS_OK;
657 }
658
659
660 /*
661   replay a transaction
662  */
663 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
664 {
665         int ret, i;
666         struct ctdb_rec_data *rec = NULL;
667
668         h->in_replay = true;
669         talloc_free(h->m_write);
670         h->m_write = NULL;
671
672         ret = db_ctdb_transaction_fetch_start(h);
673         if (ret != 0) {
674                 return ret;
675         }
676
677         for (i=0;i<h->m_all->count;i++) {
678                 TDB_DATA key, data;
679
680                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
681                 if (rec == NULL) {
682                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
683                         goto failed;
684                 }
685
686                 if (rec->reqid == 0) {
687                         /* its a store */
688                         if (db_ctdb_transaction_store(h, key, data) != 0) {
689                                 goto failed;
690                         }
691                 } else {
692                         TDB_DATA data2;
693                         TALLOC_CTX *tmp_ctx = talloc_new(h);
694
695                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
696                                 talloc_free(tmp_ctx);
697                                 goto failed;
698                         }
699                         if (data2.dsize != data.dsize ||
700                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
701                                 /* the record has changed on us - we have to give up */
702                                 talloc_free(tmp_ctx);
703                                 goto failed;
704                         }
705                         talloc_free(tmp_ctx);
706                 }
707         }
708
709         return 0;
710
711 failed:
712         tdb_transaction_cancel(h->ctx->wtdb->tdb);
713         return -1;
714 }
715
716
717 /*
718   commit a transaction
719  */
720 static int db_ctdb_transaction_commit(struct db_context *db)
721 {
722         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
723                                                         struct db_ctdb_ctx);
724         NTSTATUS rets;
725         int ret;
726         int status;
727         int retries = 0;
728         struct db_ctdb_transaction_handle *h = ctx->transaction;
729         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
730
731         if (h == NULL) {
732                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
733                 return -1;
734         }
735
736         if (h->nested_cancel) {
737                 db->transaction_cancel(db);
738                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
739                 return -1;
740         }
741
742         if (h->nesting != 0) {
743                 h->nesting--;
744                 return 0;
745         }
746
747         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
748
749         talloc_set_destructor(h, NULL);
750
751         /* our commit strategy is quite complex.
752
753            - we first try to commit the changes to all other nodes
754
755            - if that works, then we commit locally and we are done
756
757            - if a commit on another node fails, then we need to cancel
758              the transaction, then restart the transaction (thus
759              opening a window of time for a pending recovery to
760              complete), then replay the transaction, checking all the
761              reads and writes (checking that reads give the same data,
762              and writes succeed). Then we retry the transaction to the
763              other nodes
764         */
765
766 again:
767         if (h->m_write == NULL) {
768                 /* no changes were made, potentially after a retry */
769                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
770                 talloc_free(h);
771                 ctx->transaction = NULL;
772                 return 0;
773         }
774
775         /* tell ctdbd to commit to the other nodes */
776         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
777                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
778                                    h->ctx->db_id, 0,
779                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
780         if (!NT_STATUS_IS_OK(rets) || status != 0) {
781                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
782                 sleep(1);
783
784                 if (!NT_STATUS_IS_OK(rets)) {
785                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
786                 } else {
787                         /* work out what error code we will give if we 
788                            have to fail the operation */
789                         switch ((enum ctdb_trans2_commit_error)status) {
790                         case CTDB_TRANS2_COMMIT_SUCCESS:
791                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
792                         case CTDB_TRANS2_COMMIT_TIMEOUT:
793                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
794                                 break;
795                         case CTDB_TRANS2_COMMIT_ALLFAIL:
796                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
797                                 break;
798                         }
799                 }
800
801                 if (++retries == 5) {
802                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
803                                  h->ctx->db_id, retries, (unsigned)failure_control));
804                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
805                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
806                                             tdb_null, NULL, NULL, NULL);
807                         h->ctx->transaction = NULL;
808                         talloc_free(h);
809                         ctx->transaction = NULL;
810                         return -1;                      
811                 }
812
813                 if (ctdb_replay_transaction(h) != 0) {
814                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
815                                  (unsigned)failure_control));
816                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
817                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
818                                             tdb_null, NULL, NULL, NULL);
819                         h->ctx->transaction = NULL;
820                         talloc_free(h);
821                         ctx->transaction = NULL;
822                         return -1;
823                 }
824                 goto again;
825         } else {
826                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
827         }
828
829         /* do the real commit locally */
830         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
831         if (ret != 0) {
832                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
833                          (unsigned)failure_control));
834                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
835                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
836                 h->ctx->transaction = NULL;
837                 talloc_free(h);
838                 return ret;
839         }
840
841         /* tell ctdbd that we are finished with our local commit */
842         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
843                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
844                             tdb_null, NULL, NULL, NULL);
845         h->ctx->transaction = NULL;
846         talloc_free(h);
847         return 0;
848 }
849
850
851 /*
852   cancel a transaction
853  */
854 static int db_ctdb_transaction_cancel(struct db_context *db)
855 {
856         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
857                                                         struct db_ctdb_ctx);
858         struct db_ctdb_transaction_handle *h = ctx->transaction;
859
860         if (h == NULL) {
861                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
862                 return -1;
863         }
864
865         if (h->nesting != 0) {
866                 h->nesting--;
867                 h->nested_cancel = true;
868                 return 0;
869         }
870
871         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
872
873         ctx->transaction = NULL;
874         talloc_free(h);
875         return 0;
876 }
877
878
879 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
880 {
881         struct db_ctdb_rec *crec = talloc_get_type_abort(
882                 rec->private_data, struct db_ctdb_rec);
883
884         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
885 }
886
887
888
889 static NTSTATUS db_ctdb_delete(struct db_record *rec)
890 {
891         TDB_DATA data;
892
893         /*
894          * We have to store the header with empty data. TODO: Fix the
895          * tdb-level cleanup
896          */
897
898         ZERO_STRUCT(data);
899
900         return db_ctdb_store(rec, data, 0);
901
902 }
903
904 static int db_ctdb_record_destr(struct db_record* data)
905 {
906         struct db_ctdb_rec *crec = talloc_get_type_abort(
907                 data->private_data, struct db_ctdb_rec);
908
909         DEBUG(10, (DEBUGLEVEL > 10
910                    ? "Unlocking db %u key %s\n"
911                    : "Unlocking db %u key %.20s\n",
912                    (int)crec->ctdb_ctx->db_id,
913                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
914                               data->key.dsize)));
915
916         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
917                 DEBUG(0, ("tdb_chainunlock failed\n"));
918                 return -1;
919         }
920
921         return 0;
922 }
923
924 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
925                                                TALLOC_CTX *mem_ctx,
926                                                TDB_DATA key,
927                                                bool persistent)
928 {
929         struct db_record *result;
930         struct db_ctdb_rec *crec;
931         NTSTATUS status;
932         TDB_DATA ctdb_data;
933         int migrate_attempts = 0;
934
935         if (!(result = talloc(mem_ctx, struct db_record))) {
936                 DEBUG(0, ("talloc failed\n"));
937                 return NULL;
938         }
939
940         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
941                 DEBUG(0, ("talloc failed\n"));
942                 TALLOC_FREE(result);
943                 return NULL;
944         }
945
946         result->private_data = (void *)crec;
947         crec->ctdb_ctx = ctx;
948
949         result->key.dsize = key.dsize;
950         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
951         if (result->key.dptr == NULL) {
952                 DEBUG(0, ("talloc failed\n"));
953                 TALLOC_FREE(result);
954                 return NULL;
955         }
956
957         /*
958          * Do a blocking lock on the record
959          */
960 again:
961
962         if (DEBUGLEVEL >= 10) {
963                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
964                 DEBUG(10, (DEBUGLEVEL > 10
965                            ? "Locking db %u key %s\n"
966                            : "Locking db %u key %.20s\n",
967                            (int)crec->ctdb_ctx->db_id, keystr));
968                 TALLOC_FREE(keystr);
969         }
970
971         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
972                 DEBUG(3, ("tdb_chainlock failed\n"));
973                 TALLOC_FREE(result);
974                 return NULL;
975         }
976
977         result->store = db_ctdb_store;
978         result->delete_rec = db_ctdb_delete;
979         talloc_set_destructor(result, db_ctdb_record_destr);
980
981         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
982
983         /*
984          * See if we have a valid record and we are the dmaster. If so, we can
985          * take the shortcut and just return it.
986          */
987
988         if ((ctdb_data.dptr == NULL) ||
989             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
990             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
991 #if 0
992             || (random() % 2 != 0)
993 #endif
994 ) {
995                 SAFE_FREE(ctdb_data.dptr);
996                 tdb_chainunlock(ctx->wtdb->tdb, key);
997                 talloc_set_destructor(result, NULL);
998
999                 migrate_attempts += 1;
1000
1001                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1002                            ctdb_data.dptr, ctdb_data.dptr ?
1003                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1004                            get_my_vnn()));
1005
1006                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1007                 if (!NT_STATUS_IS_OK(status)) {
1008                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1009                                   nt_errstr(status)));
1010                         TALLOC_FREE(result);
1011                         return NULL;
1012                 }
1013                 /* now its migrated, try again */
1014                 goto again;
1015         }
1016
1017         if (migrate_attempts > 10) {
1018                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1019                           migrate_attempts));
1020         }
1021
1022         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1023
1024         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1025         result->value.dptr = NULL;
1026
1027         if ((result->value.dsize != 0)
1028             && !(result->value.dptr = (uint8 *)talloc_memdup(
1029                          result, ctdb_data.dptr + sizeof(crec->header),
1030                          result->value.dsize))) {
1031                 DEBUG(0, ("talloc failed\n"));
1032                 TALLOC_FREE(result);
1033         }
1034
1035         SAFE_FREE(ctdb_data.dptr);
1036
1037         return result;
1038 }
1039
1040 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1041                                               TALLOC_CTX *mem_ctx,
1042                                               TDB_DATA key)
1043 {
1044         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1045                                                         struct db_ctdb_ctx);
1046
1047         if (ctx->transaction != NULL) {
1048                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1049         }
1050
1051         if (db->persistent) {
1052                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1053         }
1054
1055         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1056 }
1057
1058 /*
1059   fetch (unlocked, no migration) operation on ctdb
1060  */
1061 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1062                          TDB_DATA key, TDB_DATA *data)
1063 {
1064         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1065                                                         struct db_ctdb_ctx);
1066         NTSTATUS status;
1067         TDB_DATA ctdb_data;
1068
1069         if (ctx->transaction) {
1070                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1071         }
1072
1073         /* try a direct fetch */
1074         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1075
1076         /*
1077          * See if we have a valid record and we are the dmaster. If so, we can
1078          * take the shortcut and just return it.
1079          * we bypass the dmaster check for persistent databases
1080          */
1081         if ((ctdb_data.dptr != NULL) &&
1082             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1083             (db->persistent ||
1084              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1085                 /* we are the dmaster - avoid the ctdb protocol op */
1086
1087                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1088                 if (data->dsize == 0) {
1089                         SAFE_FREE(ctdb_data.dptr);
1090                         data->dptr = NULL;
1091                         return 0;
1092                 }
1093
1094                 data->dptr = (uint8 *)talloc_memdup(
1095                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1096                         data->dsize);
1097
1098                 SAFE_FREE(ctdb_data.dptr);
1099
1100                 if (data->dptr == NULL) {
1101                         return -1;
1102                 }
1103                 return 0;
1104         }
1105
1106         SAFE_FREE(ctdb_data.dptr);
1107
1108         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1109         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1110         if (!NT_STATUS_IS_OK(status)) {
1111                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1112                 return -1;
1113         }
1114
1115         return 0;
1116 }
1117
1118 struct traverse_state {
1119         struct db_context *db;
1120         int (*fn)(struct db_record *rec, void *private_data);
1121         void *private_data;
1122 };
1123
1124 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1125 {
1126         struct traverse_state *state = (struct traverse_state *)private_data;
1127         struct db_record *rec;
1128         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1129         /* we have to give them a locked record to prevent races */
1130         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1131         if (rec && rec->value.dsize > 0) {
1132                 state->fn(rec, state->private_data);
1133         }
1134         talloc_free(tmp_ctx);
1135 }
1136
1137 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1138                                         void *private_data)
1139 {
1140         struct traverse_state *state = (struct traverse_state *)private_data;
1141         struct db_record *rec;
1142         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1143         int ret = 0;
1144         /* we have to give them a locked record to prevent races */
1145         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1146         if (rec && rec->value.dsize > 0) {
1147                 ret = state->fn(rec, state->private_data);
1148         }
1149         talloc_free(tmp_ctx);
1150         return ret;
1151 }
1152
1153 static int db_ctdb_traverse(struct db_context *db,
1154                             int (*fn)(struct db_record *rec,
1155                                       void *private_data),
1156                             void *private_data)
1157 {
1158         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1159                                                         struct db_ctdb_ctx);
1160         struct traverse_state state;
1161
1162         state.db = db;
1163         state.fn = fn;
1164         state.private_data = private_data;
1165
1166         if (db->persistent) {
1167                 /* for persistent databases we don't need to do a ctdb traverse,
1168                    we can do a faster local traverse */
1169                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1170         }
1171
1172
1173         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1174         return 0;
1175 }
1176
1177 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1178 {
1179         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1180 }
1181
1182 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1183 {
1184         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1185 }
1186
1187 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1188 {
1189         struct traverse_state *state = (struct traverse_state *)private_data;
1190         struct db_record rec;
1191         rec.key = key;
1192         rec.value = data;
1193         rec.store = db_ctdb_store_deny;
1194         rec.delete_rec = db_ctdb_delete_deny;
1195         rec.private_data = state->db;
1196         state->fn(&rec, state->private_data);
1197 }
1198
1199 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1200                                         void *private_data)
1201 {
1202         struct traverse_state *state = (struct traverse_state *)private_data;
1203         struct db_record rec;
1204         rec.key = kbuf;
1205         rec.value = dbuf;
1206         rec.store = db_ctdb_store_deny;
1207         rec.delete_rec = db_ctdb_delete_deny;
1208         rec.private_data = state->db;
1209
1210         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1211                 /* a deleted record */
1212                 return 0;
1213         }
1214         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1215         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1216
1217         return state->fn(&rec, state->private_data);
1218 }
1219
1220 static int db_ctdb_traverse_read(struct db_context *db,
1221                                  int (*fn)(struct db_record *rec,
1222                                            void *private_data),
1223                                  void *private_data)
1224 {
1225         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1226                                                         struct db_ctdb_ctx);
1227         struct traverse_state state;
1228
1229         state.db = db;
1230         state.fn = fn;
1231         state.private_data = private_data;
1232
1233         if (db->persistent) {
1234                 /* for persistent databases we don't need to do a ctdb traverse,
1235                    we can do a faster local traverse */
1236                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1237         }
1238
1239         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1240         return 0;
1241 }
1242
1243 static int db_ctdb_get_seqnum(struct db_context *db)
1244 {
1245         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1246                                                         struct db_ctdb_ctx);
1247         return tdb_get_seqnum(ctx->wtdb->tdb);
1248 }
1249
1250 static int db_ctdb_get_flags(struct db_context *db)
1251 {
1252         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1253                                                         struct db_ctdb_ctx);
1254         return tdb_get_flags(ctx->wtdb->tdb);
1255 }
1256
1257 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1258                                 const char *name,
1259                                 int hash_size, int tdb_flags,
1260                                 int open_flags, mode_t mode)
1261 {
1262         struct db_context *result;
1263         struct db_ctdb_ctx *db_ctdb;
1264         char *db_path;
1265
1266         if (!lp_clustering()) {
1267                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1268                 return NULL;
1269         }
1270
1271         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1272                 DEBUG(0, ("talloc failed\n"));
1273                 TALLOC_FREE(result);
1274                 return NULL;
1275         }
1276
1277         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1278                 DEBUG(0, ("talloc failed\n"));
1279                 TALLOC_FREE(result);
1280                 return NULL;
1281         }
1282
1283         db_ctdb->transaction = NULL;
1284         db_ctdb->db = result;
1285
1286         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1287                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1288                 TALLOC_FREE(result);
1289                 return NULL;
1290         }
1291
1292         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1293
1294         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1295
1296         /* only pass through specific flags */
1297         tdb_flags &= TDB_SEQNUM;
1298
1299         /* honor permissions if user has specified O_CREAT */
1300         if (open_flags & O_CREAT) {
1301                 chmod(db_path, mode);
1302         }
1303
1304         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1305         if (db_ctdb->wtdb == NULL) {
1306                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1307                 TALLOC_FREE(result);
1308                 return NULL;
1309         }
1310         talloc_free(db_path);
1311
1312         result->private_data = (void *)db_ctdb;
1313         result->fetch_locked = db_ctdb_fetch_locked;
1314         result->fetch = db_ctdb_fetch;
1315         result->traverse = db_ctdb_traverse;
1316         result->traverse_read = db_ctdb_traverse_read;
1317         result->get_seqnum = db_ctdb_get_seqnum;
1318         result->get_flags = db_ctdb_get_flags;
1319         result->transaction_start = db_ctdb_transaction_start;
1320         result->transaction_commit = db_ctdb_transaction_commit;
1321         result->transaction_cancel = db_ctdb_transaction_cancel;
1322
1323         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1324                  name, db_ctdb->db_id));
1325
1326         return result;
1327 }
1328 #endif