1d207e7abf52bf88f7c093502430897b62a5be0b
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /*
30          * we store the reads and writes done under a transaction:
31          * - one list stores both reads and writes (m_all),
32          * - the other just writes (m_write)
33          */
34         struct ctdb_marshall_buffer *m_all;
35         struct ctdb_marshall_buffer *m_write;
36         uint32_t nesting;
37         bool nested_cancel;
38 };
39
40 struct db_ctdb_ctx {
41         struct db_context *db;
42         struct tdb_wrap *wtdb;
43         uint32 db_id;
44         struct db_ctdb_transaction_handle *transaction;
45 };
46
47 struct db_ctdb_rec {
48         struct db_ctdb_ctx *ctdb_ctx;
49         struct ctdb_ltdb_header header;
50 };
51
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53                                                TALLOC_CTX *mem_ctx,
54                                                TDB_DATA key,
55                                                bool persistent);
56
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 {
59         NTSTATUS status;
60         enum TDB_ERROR tret = tdb_error(tdb);
61
62         switch (tret) {
63         case TDB_ERR_EXISTS:
64                 status = NT_STATUS_OBJECT_NAME_COLLISION;
65                 break;
66         case TDB_ERR_NOEXIST:
67                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68                 break;
69         default:
70                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71                 break;
72         }
73
74         return status;
75 }
76
77
78 /*
79  * Store a record together with the ctdb record header
80  * in the local copy of the database.
81  */
82 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
83                                    TDB_DATA key,
84                                    struct ctdb_ltdb_header *header,
85                                    TDB_DATA data)
86 {
87         TALLOC_CTX *tmp_ctx = talloc_stackframe();
88         TDB_DATA rec;
89         int ret;
90
91         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
92         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
93
94         if (rec.dptr == NULL) {
95                 talloc_free(tmp_ctx);
96                 return NT_STATUS_NO_MEMORY;
97         }
98
99         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
100         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
101
102         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
103
104         talloc_free(tmp_ctx);
105
106         return (ret == 0) ? NT_STATUS_OK
107                           : tdb_error_to_ntstatus(db->wtdb->tdb);
108
109 }
110
111 /*
112   form a ctdb_rec_data record from a key/data pair
113
114   note that header may be NULL. If not NULL then it is included in the data portion
115   of the record
116  */
117 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
118                                                   TDB_DATA key, 
119                                                   struct ctdb_ltdb_header *header,
120                                                   TDB_DATA data)
121 {
122         size_t length;
123         struct ctdb_rec_data *d;
124
125         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
126                 data.dsize + (header?sizeof(*header):0);
127         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
128         if (d == NULL) {
129                 return NULL;
130         }
131         d->length = length;
132         d->reqid = reqid;
133         d->keylen = key.dsize;
134         memcpy(&d->data[0], key.dptr, key.dsize);
135         if (header) {
136                 d->datalen = data.dsize + sizeof(*header);
137                 memcpy(&d->data[key.dsize], header, sizeof(*header));
138                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
139         } else {
140                 d->datalen = data.dsize;
141                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
142         }
143         return d;
144 }
145
146
147 /* helper function for marshalling multiple records */
148 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
149                                                struct ctdb_marshall_buffer *m,
150                                                uint64_t db_id,
151                                                uint32_t reqid,
152                                                TDB_DATA key,
153                                                struct ctdb_ltdb_header *header,
154                                                TDB_DATA data)
155 {
156         struct ctdb_rec_data *r;
157         size_t m_size, r_size;
158         struct ctdb_marshall_buffer *m2 = NULL;
159
160         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
161         if (r == NULL) {
162                 talloc_free(m);
163                 return NULL;
164         }
165
166         if (m == NULL) {
167                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
168                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
169                 if (m == NULL) {
170                         goto done;
171                 }
172                 m->db_id = db_id;
173         }
174
175         m_size = talloc_get_size(m);
176         r_size = talloc_get_size(r);
177
178         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
179                 mem_ctx, m,  m_size + r_size);
180         if (m2 == NULL) {
181                 talloc_free(m);
182                 goto done;
183         }
184
185         memcpy(m_size + (uint8_t *)m2, r, r_size);
186
187         m2->count++;
188
189 done:
190         talloc_free(r);
191         return m2;
192 }
193
194 /* we've finished marshalling, return a data blob with the marshalled records */
195 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
196 {
197         TDB_DATA data;
198         data.dptr = (uint8_t *)m;
199         data.dsize = talloc_get_size(m);
200         return data;
201 }
202
203 /* 
204    loop over a marshalling buffer 
205
206      - pass r==NULL to start
207      - loop the number of times indicated by m->count
208 */
209 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
210                                                      uint32_t *reqid,
211                                                      struct ctdb_ltdb_header *header,
212                                                      TDB_DATA *key, TDB_DATA *data)
213 {
214         if (r == NULL) {
215                 r = (struct ctdb_rec_data *)&m->data[0];
216         } else {
217                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
218         }
219
220         if (reqid != NULL) {
221                 *reqid = r->reqid;
222         }
223
224         if (key != NULL) {
225                 key->dptr   = &r->data[0];
226                 key->dsize  = r->keylen;
227         }
228         if (data != NULL) {
229                 data->dptr  = &r->data[r->keylen];
230                 data->dsize = r->datalen;
231                 if (header != NULL) {
232                         data->dptr += sizeof(*header);
233                         data->dsize -= sizeof(*header);
234                 }
235         }
236
237         if (header != NULL) {
238                 if (r->datalen < sizeof(*header)) {
239                         return NULL;
240                 }
241                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
242         }
243
244         return r;
245 }
246
247
248
249 /**
250  * CTDB transaction destructor
251  */
252 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
253 {
254         tdb_transaction_cancel(h->ctx->wtdb->tdb);
255         return 0;
256 }
257
258 /**
259  * start a transaction on a ctdb database:
260  * - lock the transaction lock key
261  * - start the tdb transaction
262  */
263 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
264 {
265         struct db_record *rh;
266         TDB_DATA key;
267         TALLOC_CTX *tmp_ctx;
268         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
269         int ret;
270         struct db_ctdb_ctx *ctx = h->ctx;
271         TDB_DATA data;
272
273         key.dptr = (uint8_t *)discard_const(keyname);
274         key.dsize = strlen(keyname);
275
276 again:
277         tmp_ctx = talloc_new(h);
278
279         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
280         if (rh == NULL) {
281                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
282                 talloc_free(tmp_ctx);
283                 return -1;
284         }
285         talloc_free(rh);
286
287         ret = tdb_transaction_start(ctx->wtdb->tdb);
288         if (ret != 0) {
289                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
290                 talloc_free(tmp_ctx);
291                 return -1;
292         }
293
294         data = tdb_fetch(ctx->wtdb->tdb, key);
295         if ((data.dptr == NULL) ||
296             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
297             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
298                 SAFE_FREE(data.dptr);
299                 tdb_transaction_cancel(ctx->wtdb->tdb);
300                 talloc_free(tmp_ctx);
301                 goto again;
302         }
303
304         SAFE_FREE(data.dptr);
305         talloc_free(tmp_ctx);
306
307         return 0;
308 }
309
310
311 /**
312  * CTDB dbwrap API: transaction_start function
313  * starts a transaction on a persistent database
314  */
315 static int db_ctdb_transaction_start(struct db_context *db)
316 {
317         struct db_ctdb_transaction_handle *h;
318         int ret;
319         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
320                                                         struct db_ctdb_ctx);
321
322         if (!db->persistent) {
323                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
324                          ctx->db_id));
325                 return -1;
326         }
327
328         if (ctx->transaction) {
329                 ctx->transaction->nesting++;
330                 return 0;
331         }
332
333         h = talloc_zero(db, struct db_ctdb_transaction_handle);
334         if (h == NULL) {
335                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
336                 return -1;
337         }
338
339         h->ctx = ctx;
340
341         ret = db_ctdb_transaction_fetch_start(h);
342         if (ret != 0) {
343                 talloc_free(h);
344                 return -1;
345         }
346
347         talloc_set_destructor(h, db_ctdb_transaction_destructor);
348
349         ctx->transaction = h;
350
351         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
352
353         return 0;
354 }
355
356
357
358 /*
359   fetch a record inside a transaction
360  */
361 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
362                                      TALLOC_CTX *mem_ctx, 
363                                      TDB_DATA key, TDB_DATA *data)
364 {
365         struct db_ctdb_transaction_handle *h = db->transaction;
366
367         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
368
369         if (data->dptr != NULL) {
370                 uint8_t *oldptr = (uint8_t *)data->dptr;
371                 data->dsize -= sizeof(struct ctdb_ltdb_header);
372                 if (data->dsize == 0) {
373                         data->dptr = NULL;
374                 } else {
375                         data->dptr = (uint8 *)
376                                 talloc_memdup(
377                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
378                                         data->dsize);
379                 }
380                 SAFE_FREE(oldptr);
381                 if (data->dptr == NULL && data->dsize != 0) {
382                         return -1;
383                 }
384         }
385
386         if (!h->in_replay) {
387                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
388                 if (h->m_all == NULL) {
389                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
390                         data->dsize = 0;
391                         talloc_free(data->dptr);
392                         return -1;
393                 }
394         }
395
396         return 0;
397 }
398
399
400 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
401 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
402
403 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
404                                                           TALLOC_CTX *mem_ctx,
405                                                           TDB_DATA key)
406 {
407         struct db_record *result;
408         TDB_DATA ctdb_data;
409
410         if (!(result = talloc(mem_ctx, struct db_record))) {
411                 DEBUG(0, ("talloc failed\n"));
412                 return NULL;
413         }
414
415         result->private_data = ctx->transaction;
416
417         result->key.dsize = key.dsize;
418         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
419         if (result->key.dptr == NULL) {
420                 DEBUG(0, ("talloc failed\n"));
421                 TALLOC_FREE(result);
422                 return NULL;
423         }
424
425         result->store = db_ctdb_store_transaction;
426         result->delete_rec = db_ctdb_delete_transaction;
427
428         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
429         if (ctdb_data.dptr == NULL) {
430                 /* create the record */
431                 result->value = tdb_null;
432                 return result;
433         }
434
435         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
436         result->value.dptr = NULL;
437
438         if ((result->value.dsize != 0)
439             && !(result->value.dptr = (uint8 *)talloc_memdup(
440                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
441                          result->value.dsize))) {
442                 DEBUG(0, ("talloc failed\n"));
443                 TALLOC_FREE(result);
444         }
445
446         SAFE_FREE(ctdb_data.dptr);
447
448         return result;
449 }
450
451 static int db_ctdb_record_destructor(struct db_record **recp)
452 {
453         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
454         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
455                 rec->private_data, struct db_ctdb_transaction_handle);
456         int ret = h->ctx->db->transaction_commit(h->ctx->db);
457         if (ret != 0) {
458                 DEBUG(0,(__location__ " transaction_commit failed\n"));
459         }
460         return 0;
461 }
462
463 /*
464   auto-create a transaction for persistent databases
465  */
466 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
467                                                          TALLOC_CTX *mem_ctx,
468                                                          TDB_DATA key)
469 {
470         int res;
471         struct db_record *rec, **recp;
472
473         res = db_ctdb_transaction_start(ctx->db);
474         if (res == -1) {
475                 return NULL;
476         }
477
478         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
479         if (rec == NULL) {
480                 ctx->db->transaction_cancel(ctx->db);           
481                 return NULL;
482         }
483
484         /* destroy this transaction when we release the lock */
485         recp = talloc(rec, struct db_record *);
486         if (recp == NULL) {
487                 ctx->db->transaction_cancel(ctx->db);
488                 talloc_free(rec);
489                 return NULL;
490         }
491         *recp = rec;
492         talloc_set_destructor(recp, db_ctdb_record_destructor);
493         return rec;
494 }
495
496
497 /*
498   stores a record inside a transaction
499  */
500 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
501                                      TDB_DATA key, TDB_DATA data)
502 {
503         TALLOC_CTX *tmp_ctx = talloc_new(h);
504         int ret;
505         TDB_DATA rec;
506         struct ctdb_ltdb_header header;
507         NTSTATUS status;
508
509         /* we need the header so we can update the RSN */
510         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
511         if (rec.dptr == NULL) {
512                 /* the record doesn't exist - create one with us as dmaster.
513                    This is only safe because we are in a transaction and this
514                    is a persistent database */
515                 ZERO_STRUCT(header);
516         } else {
517                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
518                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
519                 /* a special case, we are writing the same data that is there now */
520                 if (data.dsize == rec.dsize &&
521                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
522                         SAFE_FREE(rec.dptr);
523                         talloc_free(tmp_ctx);
524                         return 0;
525                 }
526                 SAFE_FREE(rec.dptr);
527         }
528
529         header.dmaster = get_my_vnn();
530         header.rsn++;
531
532         if (!h->in_replay) {
533                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
534                 if (h->m_all == NULL) {
535                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
536                         talloc_free(tmp_ctx);
537                         return -1;
538                 }
539         }
540
541         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
542         if (h->m_write == NULL) {
543                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
544                 talloc_free(tmp_ctx);
545                 return -1;
546         }
547
548         status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
549         if (NT_STATUS_IS_OK(status)) {
550                 ret = 0;
551         } else {
552                 ret = -1;
553         }
554
555         talloc_free(tmp_ctx);
556
557         return ret;
558 }
559
560
561 /* 
562    a record store inside a transaction
563  */
564 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
565 {
566         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
567                 rec->private_data, struct db_ctdb_transaction_handle);
568         int ret;
569
570         ret = db_ctdb_transaction_store(h, rec->key, data);
571         if (ret != 0) {
572                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
573         }
574         return NT_STATUS_OK;
575 }
576
577 /* 
578    a record delete inside a transaction
579  */
580 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
581 {
582         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
583                 rec->private_data, struct db_ctdb_transaction_handle);
584         int ret;
585
586         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
587         if (ret != 0) {
588                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
589         }
590         return NT_STATUS_OK;
591 }
592
593
594 /*
595   replay a transaction
596  */
597 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
598 {
599         int ret, i;
600         struct ctdb_rec_data *rec = NULL;
601
602         h->in_replay = true;
603         talloc_free(h->m_write);
604         h->m_write = NULL;
605
606         ret = db_ctdb_transaction_fetch_start(h);
607         if (ret != 0) {
608                 return ret;
609         }
610
611         for (i=0;i<h->m_all->count;i++) {
612                 TDB_DATA key, data;
613
614                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
615                 if (rec == NULL) {
616                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
617                         goto failed;
618                 }
619
620                 if (rec->reqid == 0) {
621                         /* its a store */
622                         if (db_ctdb_transaction_store(h, key, data) != 0) {
623                                 goto failed;
624                         }
625                 } else {
626                         TDB_DATA data2;
627                         TALLOC_CTX *tmp_ctx = talloc_new(h);
628
629                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
630                                 talloc_free(tmp_ctx);
631                                 goto failed;
632                         }
633                         if (data2.dsize != data.dsize ||
634                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
635                                 /* the record has changed on us - we have to give up */
636                                 talloc_free(tmp_ctx);
637                                 goto failed;
638                         }
639                         talloc_free(tmp_ctx);
640                 }
641         }
642
643         return 0;
644
645 failed:
646         tdb_transaction_cancel(h->ctx->wtdb->tdb);
647         return -1;
648 }
649
650
651 /*
652   commit a transaction
653  */
654 static int db_ctdb_transaction_commit(struct db_context *db)
655 {
656         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
657                                                         struct db_ctdb_ctx);
658         NTSTATUS rets;
659         int ret;
660         int status;
661         int retries = 0;
662         struct db_ctdb_transaction_handle *h = ctx->transaction;
663         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
664
665         if (h == NULL) {
666                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
667                 return -1;
668         }
669
670         if (h->nested_cancel) {
671                 db->transaction_cancel(db);
672                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
673                 return -1;
674         }
675
676         if (h->nesting != 0) {
677                 h->nesting--;
678                 return 0;
679         }
680
681         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
682
683         talloc_set_destructor(h, NULL);
684
685         /* our commit strategy is quite complex.
686
687            - we first try to commit the changes to all other nodes
688
689            - if that works, then we commit locally and we are done
690
691            - if a commit on another node fails, then we need to cancel
692              the transaction, then restart the transaction (thus
693              opening a window of time for a pending recovery to
694              complete), then replay the transaction, checking all the
695              reads and writes (checking that reads give the same data,
696              and writes succeed). Then we retry the transaction to the
697              other nodes
698         */
699
700 again:
701         if (h->m_write == NULL) {
702                 /* no changes were made, potentially after a retry */
703                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
704                 talloc_free(h);
705                 ctx->transaction = NULL;
706                 return 0;
707         }
708
709         /* tell ctdbd to commit to the other nodes */
710         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
711                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
712                                    h->ctx->db_id, 0,
713                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
714         if (!NT_STATUS_IS_OK(rets) || status != 0) {
715                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
716                 sleep(1);
717
718                 if (!NT_STATUS_IS_OK(rets)) {
719                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
720                 } else {
721                         /* work out what error code we will give if we 
722                            have to fail the operation */
723                         switch ((enum ctdb_trans2_commit_error)status) {
724                         case CTDB_TRANS2_COMMIT_SUCCESS:
725                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
726                         case CTDB_TRANS2_COMMIT_TIMEOUT:
727                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
728                                 break;
729                         case CTDB_TRANS2_COMMIT_ALLFAIL:
730                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
731                                 break;
732                         }
733                 }
734
735                 if (++retries == 5) {
736                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
737                                  h->ctx->db_id, retries, (unsigned)failure_control));
738                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
739                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
740                                             tdb_null, NULL, NULL, NULL);
741                         h->ctx->transaction = NULL;
742                         talloc_free(h);
743                         ctx->transaction = NULL;
744                         return -1;                      
745                 }
746
747                 if (ctdb_replay_transaction(h) != 0) {
748                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
749                                  (unsigned)failure_control));
750                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
751                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
752                                             tdb_null, NULL, NULL, NULL);
753                         h->ctx->transaction = NULL;
754                         talloc_free(h);
755                         ctx->transaction = NULL;
756                         return -1;
757                 }
758                 goto again;
759         } else {
760                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
761         }
762
763         /* do the real commit locally */
764         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
765         if (ret != 0) {
766                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
767                          (unsigned)failure_control));
768                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
769                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
770                 h->ctx->transaction = NULL;
771                 talloc_free(h);
772                 return ret;
773         }
774
775         /* tell ctdbd that we are finished with our local commit */
776         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
777                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
778                             tdb_null, NULL, NULL, NULL);
779         h->ctx->transaction = NULL;
780         talloc_free(h);
781         return 0;
782 }
783
784
785 /*
786   cancel a transaction
787  */
788 static int db_ctdb_transaction_cancel(struct db_context *db)
789 {
790         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
791                                                         struct db_ctdb_ctx);
792         struct db_ctdb_transaction_handle *h = ctx->transaction;
793
794         if (h == NULL) {
795                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
796                 return -1;
797         }
798
799         if (h->nesting != 0) {
800                 h->nesting--;
801                 h->nested_cancel = true;
802                 return 0;
803         }
804
805         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
806
807         ctx->transaction = NULL;
808         talloc_free(h);
809         return 0;
810 }
811
812
813 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
814 {
815         struct db_ctdb_rec *crec = talloc_get_type_abort(
816                 rec->private_data, struct db_ctdb_rec);
817
818         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
819 }
820
821
822
823 static NTSTATUS db_ctdb_delete(struct db_record *rec)
824 {
825         TDB_DATA data;
826
827         /*
828          * We have to store the header with empty data. TODO: Fix the
829          * tdb-level cleanup
830          */
831
832         ZERO_STRUCT(data);
833
834         return db_ctdb_store(rec, data, 0);
835
836 }
837
838 static int db_ctdb_record_destr(struct db_record* data)
839 {
840         struct db_ctdb_rec *crec = talloc_get_type_abort(
841                 data->private_data, struct db_ctdb_rec);
842
843         DEBUG(10, (DEBUGLEVEL > 10
844                    ? "Unlocking db %u key %s\n"
845                    : "Unlocking db %u key %.20s\n",
846                    (int)crec->ctdb_ctx->db_id,
847                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
848                               data->key.dsize)));
849
850         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
851                 DEBUG(0, ("tdb_chainunlock failed\n"));
852                 return -1;
853         }
854
855         return 0;
856 }
857
858 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
859                                                TALLOC_CTX *mem_ctx,
860                                                TDB_DATA key,
861                                                bool persistent)
862 {
863         struct db_record *result;
864         struct db_ctdb_rec *crec;
865         NTSTATUS status;
866         TDB_DATA ctdb_data;
867         int migrate_attempts = 0;
868
869         if (!(result = talloc(mem_ctx, struct db_record))) {
870                 DEBUG(0, ("talloc failed\n"));
871                 return NULL;
872         }
873
874         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
875                 DEBUG(0, ("talloc failed\n"));
876                 TALLOC_FREE(result);
877                 return NULL;
878         }
879
880         result->private_data = (void *)crec;
881         crec->ctdb_ctx = ctx;
882
883         result->key.dsize = key.dsize;
884         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
885         if (result->key.dptr == NULL) {
886                 DEBUG(0, ("talloc failed\n"));
887                 TALLOC_FREE(result);
888                 return NULL;
889         }
890
891         /*
892          * Do a blocking lock on the record
893          */
894 again:
895
896         if (DEBUGLEVEL >= 10) {
897                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
898                 DEBUG(10, (DEBUGLEVEL > 10
899                            ? "Locking db %u key %s\n"
900                            : "Locking db %u key %.20s\n",
901                            (int)crec->ctdb_ctx->db_id, keystr));
902                 TALLOC_FREE(keystr);
903         }
904
905         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
906                 DEBUG(3, ("tdb_chainlock failed\n"));
907                 TALLOC_FREE(result);
908                 return NULL;
909         }
910
911         result->store = db_ctdb_store;
912         result->delete_rec = db_ctdb_delete;
913         talloc_set_destructor(result, db_ctdb_record_destr);
914
915         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
916
917         /*
918          * See if we have a valid record and we are the dmaster. If so, we can
919          * take the shortcut and just return it.
920          */
921
922         if ((ctdb_data.dptr == NULL) ||
923             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
924             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
925 #if 0
926             || (random() % 2 != 0)
927 #endif
928 ) {
929                 SAFE_FREE(ctdb_data.dptr);
930                 tdb_chainunlock(ctx->wtdb->tdb, key);
931                 talloc_set_destructor(result, NULL);
932
933                 migrate_attempts += 1;
934
935                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
936                            ctdb_data.dptr, ctdb_data.dptr ?
937                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
938                            get_my_vnn()));
939
940                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
941                 if (!NT_STATUS_IS_OK(status)) {
942                         DEBUG(5, ("ctdb_migrate failed: %s\n",
943                                   nt_errstr(status)));
944                         TALLOC_FREE(result);
945                         return NULL;
946                 }
947                 /* now its migrated, try again */
948                 goto again;
949         }
950
951         if (migrate_attempts > 10) {
952                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
953                           migrate_attempts));
954         }
955
956         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
957
958         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
959         result->value.dptr = NULL;
960
961         if ((result->value.dsize != 0)
962             && !(result->value.dptr = (uint8 *)talloc_memdup(
963                          result, ctdb_data.dptr + sizeof(crec->header),
964                          result->value.dsize))) {
965                 DEBUG(0, ("talloc failed\n"));
966                 TALLOC_FREE(result);
967         }
968
969         SAFE_FREE(ctdb_data.dptr);
970
971         return result;
972 }
973
974 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
975                                               TALLOC_CTX *mem_ctx,
976                                               TDB_DATA key)
977 {
978         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
979                                                         struct db_ctdb_ctx);
980
981         if (ctx->transaction != NULL) {
982                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
983         }
984
985         if (db->persistent) {
986                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
987         }
988
989         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
990 }
991
992 /*
993   fetch (unlocked, no migration) operation on ctdb
994  */
995 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
996                          TDB_DATA key, TDB_DATA *data)
997 {
998         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
999                                                         struct db_ctdb_ctx);
1000         NTSTATUS status;
1001         TDB_DATA ctdb_data;
1002
1003         if (ctx->transaction) {
1004                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1005         }
1006
1007         /* try a direct fetch */
1008         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1009
1010         /*
1011          * See if we have a valid record and we are the dmaster. If so, we can
1012          * take the shortcut and just return it.
1013          * we bypass the dmaster check for persistent databases
1014          */
1015         if ((ctdb_data.dptr != NULL) &&
1016             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1017             (db->persistent ||
1018              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1019                 /* we are the dmaster - avoid the ctdb protocol op */
1020
1021                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1022                 if (data->dsize == 0) {
1023                         SAFE_FREE(ctdb_data.dptr);
1024                         data->dptr = NULL;
1025                         return 0;
1026                 }
1027
1028                 data->dptr = (uint8 *)talloc_memdup(
1029                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1030                         data->dsize);
1031
1032                 SAFE_FREE(ctdb_data.dptr);
1033
1034                 if (data->dptr == NULL) {
1035                         return -1;
1036                 }
1037                 return 0;
1038         }
1039
1040         SAFE_FREE(ctdb_data.dptr);
1041
1042         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1043         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1044         if (!NT_STATUS_IS_OK(status)) {
1045                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1046                 return -1;
1047         }
1048
1049         return 0;
1050 }
1051
1052 struct traverse_state {
1053         struct db_context *db;
1054         int (*fn)(struct db_record *rec, void *private_data);
1055         void *private_data;
1056 };
1057
1058 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1059 {
1060         struct traverse_state *state = (struct traverse_state *)private_data;
1061         struct db_record *rec;
1062         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1063         /* we have to give them a locked record to prevent races */
1064         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1065         if (rec && rec->value.dsize > 0) {
1066                 state->fn(rec, state->private_data);
1067         }
1068         talloc_free(tmp_ctx);
1069 }
1070
1071 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1072                                         void *private_data)
1073 {
1074         struct traverse_state *state = (struct traverse_state *)private_data;
1075         struct db_record *rec;
1076         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1077         int ret = 0;
1078         /* we have to give them a locked record to prevent races */
1079         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1080         if (rec && rec->value.dsize > 0) {
1081                 ret = state->fn(rec, state->private_data);
1082         }
1083         talloc_free(tmp_ctx);
1084         return ret;
1085 }
1086
1087 static int db_ctdb_traverse(struct db_context *db,
1088                             int (*fn)(struct db_record *rec,
1089                                       void *private_data),
1090                             void *private_data)
1091 {
1092         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1093                                                         struct db_ctdb_ctx);
1094         struct traverse_state state;
1095
1096         state.db = db;
1097         state.fn = fn;
1098         state.private_data = private_data;
1099
1100         if (db->persistent) {
1101                 /* for persistent databases we don't need to do a ctdb traverse,
1102                    we can do a faster local traverse */
1103                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1104         }
1105
1106
1107         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1108         return 0;
1109 }
1110
1111 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1112 {
1113         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1114 }
1115
1116 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1117 {
1118         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1119 }
1120
1121 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1122 {
1123         struct traverse_state *state = (struct traverse_state *)private_data;
1124         struct db_record rec;
1125         rec.key = key;
1126         rec.value = data;
1127         rec.store = db_ctdb_store_deny;
1128         rec.delete_rec = db_ctdb_delete_deny;
1129         rec.private_data = state->db;
1130         state->fn(&rec, state->private_data);
1131 }
1132
1133 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1134                                         void *private_data)
1135 {
1136         struct traverse_state *state = (struct traverse_state *)private_data;
1137         struct db_record rec;
1138         rec.key = kbuf;
1139         rec.value = dbuf;
1140         rec.store = db_ctdb_store_deny;
1141         rec.delete_rec = db_ctdb_delete_deny;
1142         rec.private_data = state->db;
1143
1144         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1145                 /* a deleted record */
1146                 return 0;
1147         }
1148         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1149         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1150
1151         return state->fn(&rec, state->private_data);
1152 }
1153
1154 static int db_ctdb_traverse_read(struct db_context *db,
1155                                  int (*fn)(struct db_record *rec,
1156                                            void *private_data),
1157                                  void *private_data)
1158 {
1159         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1160                                                         struct db_ctdb_ctx);
1161         struct traverse_state state;
1162
1163         state.db = db;
1164         state.fn = fn;
1165         state.private_data = private_data;
1166
1167         if (db->persistent) {
1168                 /* for persistent databases we don't need to do a ctdb traverse,
1169                    we can do a faster local traverse */
1170                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1171         }
1172
1173         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1174         return 0;
1175 }
1176
1177 static int db_ctdb_get_seqnum(struct db_context *db)
1178 {
1179         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1180                                                         struct db_ctdb_ctx);
1181         return tdb_get_seqnum(ctx->wtdb->tdb);
1182 }
1183
1184 static int db_ctdb_get_flags(struct db_context *db)
1185 {
1186         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1187                                                         struct db_ctdb_ctx);
1188         return tdb_get_flags(ctx->wtdb->tdb);
1189 }
1190
1191 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1192                                 const char *name,
1193                                 int hash_size, int tdb_flags,
1194                                 int open_flags, mode_t mode)
1195 {
1196         struct db_context *result;
1197         struct db_ctdb_ctx *db_ctdb;
1198         char *db_path;
1199
1200         if (!lp_clustering()) {
1201                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1202                 return NULL;
1203         }
1204
1205         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1206                 DEBUG(0, ("talloc failed\n"));
1207                 TALLOC_FREE(result);
1208                 return NULL;
1209         }
1210
1211         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1212                 DEBUG(0, ("talloc failed\n"));
1213                 TALLOC_FREE(result);
1214                 return NULL;
1215         }
1216
1217         db_ctdb->transaction = NULL;
1218         db_ctdb->db = result;
1219
1220         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1221                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1222                 TALLOC_FREE(result);
1223                 return NULL;
1224         }
1225
1226         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1227
1228         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1229
1230         /* only pass through specific flags */
1231         tdb_flags &= TDB_SEQNUM;
1232
1233         /* honor permissions if user has specified O_CREAT */
1234         if (open_flags & O_CREAT) {
1235                 chmod(db_path, mode);
1236         }
1237
1238         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1239         if (db_ctdb->wtdb == NULL) {
1240                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1241                 TALLOC_FREE(result);
1242                 return NULL;
1243         }
1244         talloc_free(db_path);
1245
1246         result->private_data = (void *)db_ctdb;
1247         result->fetch_locked = db_ctdb_fetch_locked;
1248         result->fetch = db_ctdb_fetch;
1249         result->traverse = db_ctdb_traverse;
1250         result->traverse_read = db_ctdb_traverse_read;
1251         result->get_seqnum = db_ctdb_get_seqnum;
1252         result->get_flags = db_ctdb_get_flags;
1253         result->transaction_start = db_ctdb_transaction_start;
1254         result->transaction_commit = db_ctdb_transaction_commit;
1255         result->transaction_cancel = db_ctdb_transaction_cancel;
1256
1257         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1258                  name, db_ctdb->db_id));
1259
1260         return result;
1261 }
1262 #endif