ec52d766c0604f5379e72fa41822bbcfc2c89be8
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /*
30          * we store the reads and writes done under a transaction:
31          * - one list stores both reads and writes (m_all),
32          * - the other just writes (m_write)
33          */
34         struct ctdb_marshall_buffer *m_all;
35         struct ctdb_marshall_buffer *m_write;
36         uint32_t nesting;
37         bool nested_cancel;
38 };
39
40 struct db_ctdb_ctx {
41         struct db_context *db;
42         struct tdb_wrap *wtdb;
43         uint32 db_id;
44         struct db_ctdb_transaction_handle *transaction;
45 };
46
47 struct db_ctdb_rec {
48         struct db_ctdb_ctx *ctdb_ctx;
49         struct ctdb_ltdb_header header;
50 };
51
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53                                                TALLOC_CTX *mem_ctx,
54                                                TDB_DATA key,
55                                                bool persistent);
56
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
58 {
59         NTSTATUS status;
60         enum TDB_ERROR tret = tdb_error(tdb);
61
62         switch (tret) {
63         case TDB_ERR_EXISTS:
64                 status = NT_STATUS_OBJECT_NAME_COLLISION;
65                 break;
66         case TDB_ERR_NOEXIST:
67                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68                 break;
69         default:
70                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71                 break;
72         }
73
74         return status;
75 }
76
77
78 /**
79  * fetch a record from the tdb, separating out the header
80  * information and returning the body of the record.
81  */
82 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
83                                    TDB_DATA key,
84                                    struct ctdb_ltdb_header *header,
85                                    TALLOC_CTX *mem_ctx,
86                                    TDB_DATA *data)
87 {
88         TDB_DATA rec;
89         NTSTATUS status;
90
91         rec = tdb_fetch(db->wtdb->tdb, key);
92         if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
93                 status = NT_STATUS_NOT_FOUND;
94                 if (data) {
95                         ZERO_STRUCTP(data);
96                 }
97                 if (header) {
98                         header->dmaster = (uint32_t)-1;
99                         header->rsn = 0;
100                 }
101                 goto done;
102         }
103
104         if (header) {
105                 *header = *(struct ctdb_ltdb_header *)rec.dptr;
106         }
107
108         if (data) {
109                 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
110                 if (data->dsize == 0) {
111                         data->dptr = NULL;
112                 } else {
113                         data->dptr = (unsigned char *)talloc_memdup(mem_ctx,
114                                         rec.dptr
115                                          + sizeof(struct ctdb_ltdb_header),
116                                         data->dsize);
117                         if (data->dptr == NULL) {
118                                 status = NT_STATUS_NO_MEMORY;
119                                 goto done;
120                         }
121                 }
122         }
123
124         status = NT_STATUS_OK;
125
126 done:
127         SAFE_FREE(rec.dptr);
128         return status;
129 }
130
131 /*
132  * Store a record together with the ctdb record header
133  * in the local copy of the database.
134  */
135 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
136                                    TDB_DATA key,
137                                    struct ctdb_ltdb_header *header,
138                                    TDB_DATA data)
139 {
140         TALLOC_CTX *tmp_ctx = talloc_stackframe();
141         TDB_DATA rec;
142         int ret;
143
144         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
145         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
146
147         if (rec.dptr == NULL) {
148                 talloc_free(tmp_ctx);
149                 return NT_STATUS_NO_MEMORY;
150         }
151
152         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
153         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
154
155         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
156
157         talloc_free(tmp_ctx);
158
159         return (ret == 0) ? NT_STATUS_OK
160                           : tdb_error_to_ntstatus(db->wtdb->tdb);
161
162 }
163
164 /*
165   form a ctdb_rec_data record from a key/data pair
166
167   note that header may be NULL. If not NULL then it is included in the data portion
168   of the record
169  */
170 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
171                                                   TDB_DATA key, 
172                                                   struct ctdb_ltdb_header *header,
173                                                   TDB_DATA data)
174 {
175         size_t length;
176         struct ctdb_rec_data *d;
177
178         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
179                 data.dsize + (header?sizeof(*header):0);
180         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
181         if (d == NULL) {
182                 return NULL;
183         }
184         d->length = length;
185         d->reqid = reqid;
186         d->keylen = key.dsize;
187         memcpy(&d->data[0], key.dptr, key.dsize);
188         if (header) {
189                 d->datalen = data.dsize + sizeof(*header);
190                 memcpy(&d->data[key.dsize], header, sizeof(*header));
191                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
192         } else {
193                 d->datalen = data.dsize;
194                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
195         }
196         return d;
197 }
198
199
200 /* helper function for marshalling multiple records */
201 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
202                                                struct ctdb_marshall_buffer *m,
203                                                uint64_t db_id,
204                                                uint32_t reqid,
205                                                TDB_DATA key,
206                                                struct ctdb_ltdb_header *header,
207                                                TDB_DATA data)
208 {
209         struct ctdb_rec_data *r;
210         size_t m_size, r_size;
211         struct ctdb_marshall_buffer *m2 = NULL;
212
213         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
214         if (r == NULL) {
215                 talloc_free(m);
216                 return NULL;
217         }
218
219         if (m == NULL) {
220                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
221                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
222                 if (m == NULL) {
223                         goto done;
224                 }
225                 m->db_id = db_id;
226         }
227
228         m_size = talloc_get_size(m);
229         r_size = talloc_get_size(r);
230
231         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
232                 mem_ctx, m,  m_size + r_size);
233         if (m2 == NULL) {
234                 talloc_free(m);
235                 goto done;
236         }
237
238         memcpy(m_size + (uint8_t *)m2, r, r_size);
239
240         m2->count++;
241
242 done:
243         talloc_free(r);
244         return m2;
245 }
246
247 /* we've finished marshalling, return a data blob with the marshalled records */
248 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
249 {
250         TDB_DATA data;
251         data.dptr = (uint8_t *)m;
252         data.dsize = talloc_get_size(m);
253         return data;
254 }
255
256 /* 
257    loop over a marshalling buffer 
258
259      - pass r==NULL to start
260      - loop the number of times indicated by m->count
261 */
262 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
263                                                      uint32_t *reqid,
264                                                      struct ctdb_ltdb_header *header,
265                                                      TDB_DATA *key, TDB_DATA *data)
266 {
267         if (r == NULL) {
268                 r = (struct ctdb_rec_data *)&m->data[0];
269         } else {
270                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
271         }
272
273         if (reqid != NULL) {
274                 *reqid = r->reqid;
275         }
276
277         if (key != NULL) {
278                 key->dptr   = &r->data[0];
279                 key->dsize  = r->keylen;
280         }
281         if (data != NULL) {
282                 data->dptr  = &r->data[r->keylen];
283                 data->dsize = r->datalen;
284                 if (header != NULL) {
285                         data->dptr += sizeof(*header);
286                         data->dsize -= sizeof(*header);
287                 }
288         }
289
290         if (header != NULL) {
291                 if (r->datalen < sizeof(*header)) {
292                         return NULL;
293                 }
294                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
295         }
296
297         return r;
298 }
299
300
301 static int32_t db_ctdb_transaction_active(uint32_t db_id)
302 {
303         int32_t status;
304         NTSTATUS ret;
305         TDB_DATA indata;
306
307         indata.dptr = (uint8_t *)&db_id;
308         indata.dsize = sizeof(db_id);
309
310         ret = ctdbd_control_local(messaging_ctdbd_connection(),
311                                   CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
312                                   indata, NULL, NULL, &status);
313
314         if (!NT_STATUS_IS_OK(ret)) {
315                 DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
316                 return -1;
317         }
318
319         return status;
320 }
321
322
323 /**
324  * CTDB transaction destructor
325  */
326 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
327 {
328         tdb_transaction_cancel(h->ctx->wtdb->tdb);
329         return 0;
330 }
331
332 /**
333  * start a transaction on a ctdb database:
334  * - lock the transaction lock key
335  * - start the tdb transaction
336  */
337 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
338 {
339         struct db_record *rh;
340         struct db_ctdb_rec *crec;
341         TDB_DATA key;
342         TALLOC_CTX *tmp_ctx;
343         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
344         int ret;
345         struct db_ctdb_ctx *ctx = h->ctx;
346         TDB_DATA data;
347         pid_t pid;
348         NTSTATUS status;
349         struct ctdb_ltdb_header header;
350
351         key.dptr = (uint8_t *)discard_const(keyname);
352         key.dsize = strlen(keyname);
353
354 again:
355         tmp_ctx = talloc_new(h);
356
357         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
358         if (rh == NULL) {
359                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
360                 talloc_free(tmp_ctx);
361                 return -1;
362         }
363         crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
364
365         /*
366          * store the pid in the database:
367          * it is not enought that the node is dmaster...
368          */
369         pid = getpid();
370         data.dptr = (unsigned char *)&pid;
371         data.dsize = sizeof(pid_t);
372         status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
373         if (!NT_STATUS_IS_OK(status)) {
374                 DEBUG(0, (__location__ " Failed to store pid in transaction "
375                           "record: %s\n", nt_errstr(status)));
376                 talloc_free(tmp_ctx);
377                 return -1;
378         }
379
380         talloc_free(rh);
381
382         ret = tdb_transaction_start(ctx->wtdb->tdb);
383         if (ret != 0) {
384                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
385                 talloc_free(tmp_ctx);
386                 return -1;
387         }
388
389         status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
390         if (!NT_STATUS_IS_OK(status) || header.dmaster != get_my_vnn()) {
391                 tdb_transaction_cancel(ctx->wtdb->tdb);
392                 talloc_free(tmp_ctx);
393                 goto again;
394         }
395
396         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
397                 tdb_transaction_cancel(ctx->wtdb->tdb);
398                 talloc_free(tmp_ctx);
399                 goto again;
400         }
401
402         talloc_free(tmp_ctx);
403
404         return 0;
405 }
406
407
408 /**
409  * CTDB dbwrap API: transaction_start function
410  * starts a transaction on a persistent database
411  */
412 static int db_ctdb_transaction_start(struct db_context *db)
413 {
414         struct db_ctdb_transaction_handle *h;
415         int ret;
416         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
417                                                         struct db_ctdb_ctx);
418
419         if (!db->persistent) {
420                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
421                          ctx->db_id));
422                 return -1;
423         }
424
425         if (ctx->transaction) {
426                 ctx->transaction->nesting++;
427                 return 0;
428         }
429
430         h = talloc_zero(db, struct db_ctdb_transaction_handle);
431         if (h == NULL) {
432                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
433                 return -1;
434         }
435
436         h->ctx = ctx;
437
438         ret = db_ctdb_transaction_fetch_start(h);
439         if (ret != 0) {
440                 talloc_free(h);
441                 return -1;
442         }
443
444         talloc_set_destructor(h, db_ctdb_transaction_destructor);
445
446         ctx->transaction = h;
447
448         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
449
450         return 0;
451 }
452
453
454
455 /*
456   fetch a record inside a transaction
457  */
458 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
459                                      TALLOC_CTX *mem_ctx, 
460                                      TDB_DATA key, TDB_DATA *data)
461 {
462         struct db_ctdb_transaction_handle *h = db->transaction;
463         NTSTATUS status;
464
465         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
466
467         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
468                 *data = tdb_null;
469         } else if (!NT_STATUS_IS_OK(status)) {
470                 return -1;
471         }
472
473         if (!h->in_replay) {
474                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
475                 if (h->m_all == NULL) {
476                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
477                         data->dsize = 0;
478                         talloc_free(data->dptr);
479                         return -1;
480                 }
481         }
482
483         return 0;
484 }
485
486
487 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
488 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
489
490 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
491                                                           TALLOC_CTX *mem_ctx,
492                                                           TDB_DATA key)
493 {
494         struct db_record *result;
495         TDB_DATA ctdb_data;
496
497         if (!(result = talloc(mem_ctx, struct db_record))) {
498                 DEBUG(0, ("talloc failed\n"));
499                 return NULL;
500         }
501
502         result->private_data = ctx->transaction;
503
504         result->key.dsize = key.dsize;
505         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
506         if (result->key.dptr == NULL) {
507                 DEBUG(0, ("talloc failed\n"));
508                 TALLOC_FREE(result);
509                 return NULL;
510         }
511
512         result->store = db_ctdb_store_transaction;
513         result->delete_rec = db_ctdb_delete_transaction;
514
515         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
516         if (ctdb_data.dptr == NULL) {
517                 /* create the record */
518                 result->value = tdb_null;
519                 return result;
520         }
521
522         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
523         result->value.dptr = NULL;
524
525         if ((result->value.dsize != 0)
526             && !(result->value.dptr = (uint8 *)talloc_memdup(
527                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
528                          result->value.dsize))) {
529                 DEBUG(0, ("talloc failed\n"));
530                 TALLOC_FREE(result);
531         }
532
533         SAFE_FREE(ctdb_data.dptr);
534
535         return result;
536 }
537
538 static int db_ctdb_record_destructor(struct db_record **recp)
539 {
540         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
541         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
542                 rec->private_data, struct db_ctdb_transaction_handle);
543         int ret = h->ctx->db->transaction_commit(h->ctx->db);
544         if (ret != 0) {
545                 DEBUG(0,(__location__ " transaction_commit failed\n"));
546         }
547         return 0;
548 }
549
550 /*
551   auto-create a transaction for persistent databases
552  */
553 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
554                                                          TALLOC_CTX *mem_ctx,
555                                                          TDB_DATA key)
556 {
557         int res;
558         struct db_record *rec, **recp;
559
560         res = db_ctdb_transaction_start(ctx->db);
561         if (res == -1) {
562                 return NULL;
563         }
564
565         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
566         if (rec == NULL) {
567                 ctx->db->transaction_cancel(ctx->db);           
568                 return NULL;
569         }
570
571         /* destroy this transaction when we release the lock */
572         recp = talloc(rec, struct db_record *);
573         if (recp == NULL) {
574                 ctx->db->transaction_cancel(ctx->db);
575                 talloc_free(rec);
576                 return NULL;
577         }
578         *recp = rec;
579         talloc_set_destructor(recp, db_ctdb_record_destructor);
580         return rec;
581 }
582
583
584 /*
585   stores a record inside a transaction
586  */
587 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
588                                      TDB_DATA key, TDB_DATA data)
589 {
590         TALLOC_CTX *tmp_ctx = talloc_new(h);
591         int ret;
592         TDB_DATA rec;
593         struct ctdb_ltdb_header header;
594         NTSTATUS status;
595
596         /* we need the header so we can update the RSN */
597         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
598         if (rec.dptr == NULL) {
599                 /* the record doesn't exist - create one with us as dmaster.
600                    This is only safe because we are in a transaction and this
601                    is a persistent database */
602                 ZERO_STRUCT(header);
603         } else {
604                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
605                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
606                 /* a special case, we are writing the same data that is there now */
607                 if (data.dsize == rec.dsize &&
608                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
609                         SAFE_FREE(rec.dptr);
610                         talloc_free(tmp_ctx);
611                         return 0;
612                 }
613                 SAFE_FREE(rec.dptr);
614         }
615
616         header.dmaster = get_my_vnn();
617         header.rsn++;
618
619         if (!h->in_replay) {
620                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
621                 if (h->m_all == NULL) {
622                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
623                         talloc_free(tmp_ctx);
624                         return -1;
625                 }
626         }
627
628         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
629         if (h->m_write == NULL) {
630                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
631                 talloc_free(tmp_ctx);
632                 return -1;
633         }
634
635         status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
636         if (NT_STATUS_IS_OK(status)) {
637                 ret = 0;
638         } else {
639                 ret = -1;
640         }
641
642         talloc_free(tmp_ctx);
643
644         return ret;
645 }
646
647
648 /* 
649    a record store inside a transaction
650  */
651 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
652 {
653         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
654                 rec->private_data, struct db_ctdb_transaction_handle);
655         int ret;
656
657         ret = db_ctdb_transaction_store(h, rec->key, data);
658         if (ret != 0) {
659                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
660         }
661         return NT_STATUS_OK;
662 }
663
664 /* 
665    a record delete inside a transaction
666  */
667 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
668 {
669         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
670                 rec->private_data, struct db_ctdb_transaction_handle);
671         int ret;
672
673         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
674         if (ret != 0) {
675                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
676         }
677         return NT_STATUS_OK;
678 }
679
680
681 /*
682   replay a transaction
683  */
684 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
685 {
686         int ret, i;
687         struct ctdb_rec_data *rec = NULL;
688
689         h->in_replay = true;
690         talloc_free(h->m_write);
691         h->m_write = NULL;
692
693         ret = db_ctdb_transaction_fetch_start(h);
694         if (ret != 0) {
695                 return ret;
696         }
697
698         for (i=0;i<h->m_all->count;i++) {
699                 TDB_DATA key, data;
700
701                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
702                 if (rec == NULL) {
703                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
704                         goto failed;
705                 }
706
707                 if (rec->reqid == 0) {
708                         /* its a store */
709                         if (db_ctdb_transaction_store(h, key, data) != 0) {
710                                 goto failed;
711                         }
712                 } else {
713                         TDB_DATA data2;
714                         TALLOC_CTX *tmp_ctx = talloc_new(h);
715
716                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
717                                 talloc_free(tmp_ctx);
718                                 goto failed;
719                         }
720                         if (data2.dsize != data.dsize ||
721                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
722                                 /* the record has changed on us - we have to give up */
723                                 talloc_free(tmp_ctx);
724                                 goto failed;
725                         }
726                         talloc_free(tmp_ctx);
727                 }
728         }
729
730         return 0;
731
732 failed:
733         tdb_transaction_cancel(h->ctx->wtdb->tdb);
734         return -1;
735 }
736
737
738 /*
739   commit a transaction
740  */
741 static int db_ctdb_transaction_commit(struct db_context *db)
742 {
743         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
744                                                         struct db_ctdb_ctx);
745         NTSTATUS rets;
746         int ret;
747         int status;
748         int retries = 0;
749         struct db_ctdb_transaction_handle *h = ctx->transaction;
750         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
751
752         if (h == NULL) {
753                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
754                 return -1;
755         }
756
757         if (h->nested_cancel) {
758                 db->transaction_cancel(db);
759                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
760                 return -1;
761         }
762
763         if (h->nesting != 0) {
764                 h->nesting--;
765                 return 0;
766         }
767
768         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
769
770         talloc_set_destructor(h, NULL);
771
772         /* our commit strategy is quite complex.
773
774            - we first try to commit the changes to all other nodes
775
776            - if that works, then we commit locally and we are done
777
778            - if a commit on another node fails, then we need to cancel
779              the transaction, then restart the transaction (thus
780              opening a window of time for a pending recovery to
781              complete), then replay the transaction, checking all the
782              reads and writes (checking that reads give the same data,
783              and writes succeed). Then we retry the transaction to the
784              other nodes
785         */
786
787 again:
788         if (h->m_write == NULL) {
789                 /* no changes were made, potentially after a retry */
790                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
791                 talloc_free(h);
792                 ctx->transaction = NULL;
793                 return 0;
794         }
795
796         /* tell ctdbd to commit to the other nodes */
797         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
798                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
799                                    h->ctx->db_id, 0,
800                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
801         if (!NT_STATUS_IS_OK(rets) || status != 0) {
802                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
803                 sleep(1);
804
805                 if (!NT_STATUS_IS_OK(rets)) {
806                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
807                 } else {
808                         /* work out what error code we will give if we 
809                            have to fail the operation */
810                         switch ((enum ctdb_trans2_commit_error)status) {
811                         case CTDB_TRANS2_COMMIT_SUCCESS:
812                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
813                         case CTDB_TRANS2_COMMIT_TIMEOUT:
814                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
815                                 break;
816                         case CTDB_TRANS2_COMMIT_ALLFAIL:
817                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
818                                 break;
819                         }
820                 }
821
822                 if (++retries == 5) {
823                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
824                                  h->ctx->db_id, retries, (unsigned)failure_control));
825                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
826                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
827                                             tdb_null, NULL, NULL, NULL);
828                         h->ctx->transaction = NULL;
829                         talloc_free(h);
830                         ctx->transaction = NULL;
831                         return -1;                      
832                 }
833
834                 if (ctdb_replay_transaction(h) != 0) {
835                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
836                                  (unsigned)failure_control));
837                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
838                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
839                                             tdb_null, NULL, NULL, NULL);
840                         h->ctx->transaction = NULL;
841                         talloc_free(h);
842                         ctx->transaction = NULL;
843                         return -1;
844                 }
845                 goto again;
846         } else {
847                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
848         }
849
850         /* do the real commit locally */
851         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
852         if (ret != 0) {
853                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
854                          (unsigned)failure_control));
855                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
856                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
857                 h->ctx->transaction = NULL;
858                 talloc_free(h);
859                 return ret;
860         }
861
862         /* tell ctdbd that we are finished with our local commit */
863         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
864                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
865                             tdb_null, NULL, NULL, NULL);
866         h->ctx->transaction = NULL;
867         talloc_free(h);
868         return 0;
869 }
870
871
872 /*
873   cancel a transaction
874  */
875 static int db_ctdb_transaction_cancel(struct db_context *db)
876 {
877         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
878                                                         struct db_ctdb_ctx);
879         struct db_ctdb_transaction_handle *h = ctx->transaction;
880
881         if (h == NULL) {
882                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
883                 return -1;
884         }
885
886         if (h->nesting != 0) {
887                 h->nesting--;
888                 h->nested_cancel = true;
889                 return 0;
890         }
891
892         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
893
894         ctx->transaction = NULL;
895         talloc_free(h);
896         return 0;
897 }
898
899
900 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
901 {
902         struct db_ctdb_rec *crec = talloc_get_type_abort(
903                 rec->private_data, struct db_ctdb_rec);
904
905         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
906 }
907
908
909
910 static NTSTATUS db_ctdb_delete(struct db_record *rec)
911 {
912         TDB_DATA data;
913
914         /*
915          * We have to store the header with empty data. TODO: Fix the
916          * tdb-level cleanup
917          */
918
919         ZERO_STRUCT(data);
920
921         return db_ctdb_store(rec, data, 0);
922
923 }
924
925 static int db_ctdb_record_destr(struct db_record* data)
926 {
927         struct db_ctdb_rec *crec = talloc_get_type_abort(
928                 data->private_data, struct db_ctdb_rec);
929
930         DEBUG(10, (DEBUGLEVEL > 10
931                    ? "Unlocking db %u key %s\n"
932                    : "Unlocking db %u key %.20s\n",
933                    (int)crec->ctdb_ctx->db_id,
934                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
935                               data->key.dsize)));
936
937         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
938                 DEBUG(0, ("tdb_chainunlock failed\n"));
939                 return -1;
940         }
941
942         return 0;
943 }
944
945 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
946                                                TALLOC_CTX *mem_ctx,
947                                                TDB_DATA key,
948                                                bool persistent)
949 {
950         struct db_record *result;
951         struct db_ctdb_rec *crec;
952         NTSTATUS status;
953         TDB_DATA ctdb_data;
954         int migrate_attempts = 0;
955
956         if (!(result = talloc(mem_ctx, struct db_record))) {
957                 DEBUG(0, ("talloc failed\n"));
958                 return NULL;
959         }
960
961         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
962                 DEBUG(0, ("talloc failed\n"));
963                 TALLOC_FREE(result);
964                 return NULL;
965         }
966
967         result->private_data = (void *)crec;
968         crec->ctdb_ctx = ctx;
969
970         result->key.dsize = key.dsize;
971         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
972         if (result->key.dptr == NULL) {
973                 DEBUG(0, ("talloc failed\n"));
974                 TALLOC_FREE(result);
975                 return NULL;
976         }
977
978         /*
979          * Do a blocking lock on the record
980          */
981 again:
982
983         if (DEBUGLEVEL >= 10) {
984                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
985                 DEBUG(10, (DEBUGLEVEL > 10
986                            ? "Locking db %u key %s\n"
987                            : "Locking db %u key %.20s\n",
988                            (int)crec->ctdb_ctx->db_id, keystr));
989                 TALLOC_FREE(keystr);
990         }
991
992         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
993                 DEBUG(3, ("tdb_chainlock failed\n"));
994                 TALLOC_FREE(result);
995                 return NULL;
996         }
997
998         result->store = db_ctdb_store;
999         result->delete_rec = db_ctdb_delete;
1000         talloc_set_destructor(result, db_ctdb_record_destr);
1001
1002         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1003
1004         /*
1005          * See if we have a valid record and we are the dmaster. If so, we can
1006          * take the shortcut and just return it.
1007          */
1008
1009         if ((ctdb_data.dptr == NULL) ||
1010             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
1011             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
1012 #if 0
1013             || (random() % 2 != 0)
1014 #endif
1015 ) {
1016                 SAFE_FREE(ctdb_data.dptr);
1017                 tdb_chainunlock(ctx->wtdb->tdb, key);
1018                 talloc_set_destructor(result, NULL);
1019
1020                 migrate_attempts += 1;
1021
1022                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
1023                            ctdb_data.dptr, ctdb_data.dptr ?
1024                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1025                            get_my_vnn()));
1026
1027                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
1028                 if (!NT_STATUS_IS_OK(status)) {
1029                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1030                                   nt_errstr(status)));
1031                         TALLOC_FREE(result);
1032                         return NULL;
1033                 }
1034                 /* now its migrated, try again */
1035                 goto again;
1036         }
1037
1038         if (migrate_attempts > 10) {
1039                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
1040                           migrate_attempts));
1041         }
1042
1043         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1044
1045         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1046         result->value.dptr = NULL;
1047
1048         if ((result->value.dsize != 0)
1049             && !(result->value.dptr = (uint8 *)talloc_memdup(
1050                          result, ctdb_data.dptr + sizeof(crec->header),
1051                          result->value.dsize))) {
1052                 DEBUG(0, ("talloc failed\n"));
1053                 TALLOC_FREE(result);
1054         }
1055
1056         SAFE_FREE(ctdb_data.dptr);
1057
1058         return result;
1059 }
1060
1061 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1062                                               TALLOC_CTX *mem_ctx,
1063                                               TDB_DATA key)
1064 {
1065         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1066                                                         struct db_ctdb_ctx);
1067
1068         if (ctx->transaction != NULL) {
1069                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1070         }
1071
1072         if (db->persistent) {
1073                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1074         }
1075
1076         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
1077 }
1078
1079 /*
1080   fetch (unlocked, no migration) operation on ctdb
1081  */
1082 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1083                          TDB_DATA key, TDB_DATA *data)
1084 {
1085         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1086                                                         struct db_ctdb_ctx);
1087         NTSTATUS status;
1088         TDB_DATA ctdb_data;
1089
1090         if (ctx->transaction) {
1091                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1092         }
1093
1094         /* try a direct fetch */
1095         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1096
1097         /*
1098          * See if we have a valid record and we are the dmaster. If so, we can
1099          * take the shortcut and just return it.
1100          * we bypass the dmaster check for persistent databases
1101          */
1102         if ((ctdb_data.dptr != NULL) &&
1103             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1104             (db->persistent ||
1105              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1106                 /* we are the dmaster - avoid the ctdb protocol op */
1107
1108                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1109                 if (data->dsize == 0) {
1110                         SAFE_FREE(ctdb_data.dptr);
1111                         data->dptr = NULL;
1112                         return 0;
1113                 }
1114
1115                 data->dptr = (uint8 *)talloc_memdup(
1116                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1117                         data->dsize);
1118
1119                 SAFE_FREE(ctdb_data.dptr);
1120
1121                 if (data->dptr == NULL) {
1122                         return -1;
1123                 }
1124                 return 0;
1125         }
1126
1127         SAFE_FREE(ctdb_data.dptr);
1128
1129         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1130         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1131         if (!NT_STATUS_IS_OK(status)) {
1132                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1133                 return -1;
1134         }
1135
1136         return 0;
1137 }
1138
1139 struct traverse_state {
1140         struct db_context *db;
1141         int (*fn)(struct db_record *rec, void *private_data);
1142         void *private_data;
1143 };
1144
1145 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1146 {
1147         struct traverse_state *state = (struct traverse_state *)private_data;
1148         struct db_record *rec;
1149         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1150         /* we have to give them a locked record to prevent races */
1151         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1152         if (rec && rec->value.dsize > 0) {
1153                 state->fn(rec, state->private_data);
1154         }
1155         talloc_free(tmp_ctx);
1156 }
1157
1158 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1159                                         void *private_data)
1160 {
1161         struct traverse_state *state = (struct traverse_state *)private_data;
1162         struct db_record *rec;
1163         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1164         int ret = 0;
1165         /* we have to give them a locked record to prevent races */
1166         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1167         if (rec && rec->value.dsize > 0) {
1168                 ret = state->fn(rec, state->private_data);
1169         }
1170         talloc_free(tmp_ctx);
1171         return ret;
1172 }
1173
1174 static int db_ctdb_traverse(struct db_context *db,
1175                             int (*fn)(struct db_record *rec,
1176                                       void *private_data),
1177                             void *private_data)
1178 {
1179         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1180                                                         struct db_ctdb_ctx);
1181         struct traverse_state state;
1182
1183         state.db = db;
1184         state.fn = fn;
1185         state.private_data = private_data;
1186
1187         if (db->persistent) {
1188                 /* for persistent databases we don't need to do a ctdb traverse,
1189                    we can do a faster local traverse */
1190                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1191         }
1192
1193
1194         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1195         return 0;
1196 }
1197
1198 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1199 {
1200         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1201 }
1202
1203 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1204 {
1205         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1206 }
1207
1208 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1209 {
1210         struct traverse_state *state = (struct traverse_state *)private_data;
1211         struct db_record rec;
1212         rec.key = key;
1213         rec.value = data;
1214         rec.store = db_ctdb_store_deny;
1215         rec.delete_rec = db_ctdb_delete_deny;
1216         rec.private_data = state->db;
1217         state->fn(&rec, state->private_data);
1218 }
1219
1220 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1221                                         void *private_data)
1222 {
1223         struct traverse_state *state = (struct traverse_state *)private_data;
1224         struct db_record rec;
1225         rec.key = kbuf;
1226         rec.value = dbuf;
1227         rec.store = db_ctdb_store_deny;
1228         rec.delete_rec = db_ctdb_delete_deny;
1229         rec.private_data = state->db;
1230
1231         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1232                 /* a deleted record */
1233                 return 0;
1234         }
1235         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1236         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1237
1238         return state->fn(&rec, state->private_data);
1239 }
1240
1241 static int db_ctdb_traverse_read(struct db_context *db,
1242                                  int (*fn)(struct db_record *rec,
1243                                            void *private_data),
1244                                  void *private_data)
1245 {
1246         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1247                                                         struct db_ctdb_ctx);
1248         struct traverse_state state;
1249
1250         state.db = db;
1251         state.fn = fn;
1252         state.private_data = private_data;
1253
1254         if (db->persistent) {
1255                 /* for persistent databases we don't need to do a ctdb traverse,
1256                    we can do a faster local traverse */
1257                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1258         }
1259
1260         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1261         return 0;
1262 }
1263
1264 static int db_ctdb_get_seqnum(struct db_context *db)
1265 {
1266         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1267                                                         struct db_ctdb_ctx);
1268         return tdb_get_seqnum(ctx->wtdb->tdb);
1269 }
1270
1271 static int db_ctdb_get_flags(struct db_context *db)
1272 {
1273         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1274                                                         struct db_ctdb_ctx);
1275         return tdb_get_flags(ctx->wtdb->tdb);
1276 }
1277
1278 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1279                                 const char *name,
1280                                 int hash_size, int tdb_flags,
1281                                 int open_flags, mode_t mode)
1282 {
1283         struct db_context *result;
1284         struct db_ctdb_ctx *db_ctdb;
1285         char *db_path;
1286
1287         if (!lp_clustering()) {
1288                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1289                 return NULL;
1290         }
1291
1292         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1293                 DEBUG(0, ("talloc failed\n"));
1294                 TALLOC_FREE(result);
1295                 return NULL;
1296         }
1297
1298         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1299                 DEBUG(0, ("talloc failed\n"));
1300                 TALLOC_FREE(result);
1301                 return NULL;
1302         }
1303
1304         db_ctdb->transaction = NULL;
1305         db_ctdb->db = result;
1306
1307         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1308                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1309                 TALLOC_FREE(result);
1310                 return NULL;
1311         }
1312
1313         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1314
1315         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1316
1317         /* only pass through specific flags */
1318         tdb_flags &= TDB_SEQNUM;
1319
1320         /* honor permissions if user has specified O_CREAT */
1321         if (open_flags & O_CREAT) {
1322                 chmod(db_path, mode);
1323         }
1324
1325         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1326         if (db_ctdb->wtdb == NULL) {
1327                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1328                 TALLOC_FREE(result);
1329                 return NULL;
1330         }
1331         talloc_free(db_path);
1332
1333         result->private_data = (void *)db_ctdb;
1334         result->fetch_locked = db_ctdb_fetch_locked;
1335         result->fetch = db_ctdb_fetch;
1336         result->traverse = db_ctdb_traverse;
1337         result->traverse_read = db_ctdb_traverse_read;
1338         result->get_seqnum = db_ctdb_get_seqnum;
1339         result->get_flags = db_ctdb_get_flags;
1340         result->transaction_start = db_ctdb_transaction_start;
1341         result->transaction_commit = db_ctdb_transaction_commit;
1342         result->transaction_cancel = db_ctdb_transaction_cancel;
1343
1344         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1345                  name, db_ctdb->db_id));
1346
1347         return result;
1348 }
1349 #endif