Merge branch 'master' of ssh://git.samba.org/data/git/samba into master-devel
[ira/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34         uint32_t nesting;
35         bool nested_cancel;
36 };
37
38 struct db_ctdb_ctx {
39         struct db_context *db;
40         struct tdb_wrap *wtdb;
41         uint32 db_id;
42         struct db_ctdb_transaction_handle *transaction;
43 };
44
45 struct db_ctdb_rec {
46         struct db_ctdb_ctx *ctdb_ctx;
47         struct ctdb_ltdb_header header;
48 };
49
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51                                                TALLOC_CTX *mem_ctx,
52                                                TDB_DATA key,
53                                                bool persistent);
54
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 {
57         NTSTATUS status;
58         enum TDB_ERROR tret = tdb_error(tdb);
59
60         switch (tret) {
61         case TDB_ERR_EXISTS:
62                 status = NT_STATUS_OBJECT_NAME_COLLISION;
63                 break;
64         case TDB_ERR_NOEXIST:
65                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66                 break;
67         default:
68                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69                 break;
70         }
71
72         return status;
73 }
74
75
76
77 /*
78   form a ctdb_rec_data record from a key/data pair
79
80   note that header may be NULL. If not NULL then it is included in the data portion
81   of the record
82  */
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
84                                                   TDB_DATA key, 
85                                                   struct ctdb_ltdb_header *header,
86                                                   TDB_DATA data)
87 {
88         size_t length;
89         struct ctdb_rec_data *d;
90
91         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
92                 data.dsize + (header?sizeof(*header):0);
93         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94         if (d == NULL) {
95                 return NULL;
96         }
97         d->length = length;
98         d->reqid = reqid;
99         d->keylen = key.dsize;
100         memcpy(&d->data[0], key.dptr, key.dsize);
101         if (header) {
102                 d->datalen = data.dsize + sizeof(*header);
103                 memcpy(&d->data[key.dsize], header, sizeof(*header));
104                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105         } else {
106                 d->datalen = data.dsize;
107                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
108         }
109         return d;
110 }
111
112
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
115                                                struct ctdb_marshall_buffer *m,
116                                                uint64_t db_id,
117                                                uint32_t reqid,
118                                                TDB_DATA key,
119                                                struct ctdb_ltdb_header *header,
120                                                TDB_DATA data)
121 {
122         struct ctdb_rec_data *r;
123         size_t m_size, r_size;
124         struct ctdb_marshall_buffer *m2 = NULL;
125
126         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
127         if (r == NULL) {
128                 talloc_free(m);
129                 return NULL;
130         }
131
132         if (m == NULL) {
133                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
135                 if (m == NULL) {
136                         goto done;
137                 }
138                 m->db_id = db_id;
139         }
140
141         m_size = talloc_get_size(m);
142         r_size = talloc_get_size(r);
143
144         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145                 mem_ctx, m,  m_size + r_size);
146         if (m2 == NULL) {
147                 talloc_free(m);
148                 goto done;
149         }
150
151         memcpy(m_size + (uint8_t *)m2, r, r_size);
152
153         m2->count++;
154
155 done:
156         talloc_free(r);
157         return m2;
158 }
159
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
162 {
163         TDB_DATA data;
164         data.dptr = (uint8_t *)m;
165         data.dsize = talloc_get_size(m);
166         return data;
167 }
168
169 /* 
170    loop over a marshalling buffer 
171
172      - pass r==NULL to start
173      - loop the number of times indicated by m->count
174 */
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
176                                                      uint32_t *reqid,
177                                                      struct ctdb_ltdb_header *header,
178                                                      TDB_DATA *key, TDB_DATA *data)
179 {
180         if (r == NULL) {
181                 r = (struct ctdb_rec_data *)&m->data[0];
182         } else {
183                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
184         }
185
186         if (reqid != NULL) {
187                 *reqid = r->reqid;
188         }
189
190         if (key != NULL) {
191                 key->dptr   = &r->data[0];
192                 key->dsize  = r->keylen;
193         }
194         if (data != NULL) {
195                 data->dptr  = &r->data[r->keylen];
196                 data->dsize = r->datalen;
197                 if (header != NULL) {
198                         data->dptr += sizeof(*header);
199                         data->dsize -= sizeof(*header);
200                 }
201         }
202
203         if (header != NULL) {
204                 if (r->datalen < sizeof(*header)) {
205                         return NULL;
206                 }
207                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
208         }
209
210         return r;
211 }
212
213
214
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
217 {
218         tdb_transaction_cancel(h->ctx->wtdb->tdb);
219         return 0;
220 }
221
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
224 {
225         struct db_record *rh;
226         TDB_DATA key;
227         TALLOC_CTX *tmp_ctx;
228         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
229         int ret;
230         struct db_ctdb_ctx *ctx = h->ctx;
231         TDB_DATA data;
232
233         key.dptr = (uint8_t *)discard_const(keyname);
234         key.dsize = strlen(keyname);
235
236 again:
237         tmp_ctx = talloc_new(h);
238
239         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
240         if (rh == NULL) {
241                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245         talloc_free(rh);
246
247         ret = tdb_transaction_start(ctx->wtdb->tdb);
248         if (ret != 0) {
249                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
250                 talloc_free(tmp_ctx);
251                 return -1;
252         }
253
254         data = tdb_fetch(ctx->wtdb->tdb, key);
255         if ((data.dptr == NULL) ||
256             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
257             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
258                 SAFE_FREE(data.dptr);
259                 tdb_transaction_cancel(ctx->wtdb->tdb);
260                 talloc_free(tmp_ctx);
261                 goto again;
262         }
263
264         SAFE_FREE(data.dptr);
265         talloc_free(tmp_ctx);
266
267         return 0;
268 }
269
270
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context *db)
273 {
274         struct db_ctdb_transaction_handle *h;
275         int ret;
276         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
277                                                         struct db_ctdb_ctx);
278
279         if (!db->persistent) {
280                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
281                          ctx->db_id));
282                 return -1;
283         }
284
285         if (ctx->transaction) {
286                 ctx->transaction->nesting++;
287                 return 0;
288         }
289
290         h = talloc_zero(db, struct db_ctdb_transaction_handle);
291         if (h == NULL) {
292                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
293                 return -1;
294         }
295
296         h->ctx = ctx;
297
298         ret = db_ctdb_transaction_fetch_start(h);
299         if (ret != 0) {
300                 talloc_free(h);
301                 return -1;
302         }
303
304         talloc_set_destructor(h, db_ctdb_transaction_destructor);
305
306         ctx->transaction = h;
307
308         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
309
310         return 0;
311 }
312
313
314
315 /*
316   fetch a record inside a transaction
317  */
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
319                                      TALLOC_CTX *mem_ctx, 
320                                      TDB_DATA key, TDB_DATA *data)
321 {
322         struct db_ctdb_transaction_handle *h = db->transaction;
323
324         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
325
326         if (data->dptr != NULL) {
327                 uint8_t *oldptr = (uint8_t *)data->dptr;
328                 data->dsize -= sizeof(struct ctdb_ltdb_header);
329                 if (data->dsize == 0) {
330                         data->dptr = NULL;
331                 } else {
332                         data->dptr = (uint8 *)
333                                 talloc_memdup(
334                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
335                                         data->dsize);
336                 }
337                 SAFE_FREE(oldptr);
338                 if (data->dptr == NULL && data->dsize != 0) {
339                         return -1;
340                 }
341         }
342
343         if (!h->in_replay) {
344                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
345                 if (h->m_all == NULL) {
346                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
347                         data->dsize = 0;
348                         talloc_free(data->dptr);
349                         return -1;
350                 }
351         }
352
353         return 0;
354 }
355
356
357 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
358 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
359
360 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
361                                                           TALLOC_CTX *mem_ctx,
362                                                           TDB_DATA key)
363 {
364         struct db_record *result;
365         TDB_DATA ctdb_data;
366
367         if (!(result = talloc(mem_ctx, struct db_record))) {
368                 DEBUG(0, ("talloc failed\n"));
369                 return NULL;
370         }
371
372         result->private_data = ctx->transaction;
373
374         result->key.dsize = key.dsize;
375         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
376         if (result->key.dptr == NULL) {
377                 DEBUG(0, ("talloc failed\n"));
378                 TALLOC_FREE(result);
379                 return NULL;
380         }
381
382         result->store = db_ctdb_store_transaction;
383         result->delete_rec = db_ctdb_delete_transaction;
384
385         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
386         if (ctdb_data.dptr == NULL) {
387                 /* create the record */
388                 result->value = tdb_null;
389                 return result;
390         }
391
392         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
393         result->value.dptr = NULL;
394
395         if ((result->value.dsize != 0)
396             && !(result->value.dptr = (uint8 *)talloc_memdup(
397                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
398                          result->value.dsize))) {
399                 DEBUG(0, ("talloc failed\n"));
400                 TALLOC_FREE(result);
401         }
402
403         SAFE_FREE(ctdb_data.dptr);
404
405         return result;
406 }
407
408 static int db_ctdb_record_destructor(struct db_record **recp)
409 {
410         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
411         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
412                 rec->private_data, struct db_ctdb_transaction_handle);
413         int ret = h->ctx->db->transaction_commit(h->ctx->db);
414         if (ret != 0) {
415                 DEBUG(0,(__location__ " transaction_commit failed\n"));
416         }
417         return 0;
418 }
419
420 /*
421   auto-create a transaction for persistent databases
422  */
423 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
424                                                          TALLOC_CTX *mem_ctx,
425                                                          TDB_DATA key)
426 {
427         int res;
428         struct db_record *rec, **recp;
429
430         res = db_ctdb_transaction_start(ctx->db);
431         if (res == -1) {
432                 return NULL;
433         }
434
435         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
436         if (rec == NULL) {
437                 ctx->db->transaction_cancel(ctx->db);           
438                 return NULL;
439         }
440
441         /* destroy this transaction when we release the lock */
442         recp = talloc(rec, struct db_record *);
443         if (recp == NULL) {
444                 ctx->db->transaction_cancel(ctx->db);
445                 talloc_free(rec);
446                 return NULL;
447         }
448         *recp = rec;
449         talloc_set_destructor(recp, db_ctdb_record_destructor);
450         return rec;
451 }
452
453
454 /*
455   stores a record inside a transaction
456  */
457 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
458                                      TDB_DATA key, TDB_DATA data)
459 {
460         TALLOC_CTX *tmp_ctx = talloc_new(h);
461         int ret;
462         TDB_DATA rec;
463         struct ctdb_ltdb_header header;
464
465         /* we need the header so we can update the RSN */
466         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
467         if (rec.dptr == NULL) {
468                 /* the record doesn't exist - create one with us as dmaster.
469                    This is only safe because we are in a transaction and this
470                    is a persistent database */
471                 ZERO_STRUCT(header);
472                 header.dmaster = get_my_vnn();
473         } else {
474                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
475                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
476                 /* a special case, we are writing the same data that is there now */
477                 if (data.dsize == rec.dsize &&
478                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
479                         SAFE_FREE(rec.dptr);
480                         talloc_free(tmp_ctx);
481                         return 0;
482                 }
483                 SAFE_FREE(rec.dptr);
484         }
485
486         header.rsn++;
487
488         if (!h->in_replay) {
489                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
490                 if (h->m_all == NULL) {
491                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
492                         talloc_free(tmp_ctx);
493                         return -1;
494                 }
495         }
496
497         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
498         if (h->m_write == NULL) {
499                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
500                 talloc_free(tmp_ctx);
501                 return -1;
502         }
503
504         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
505         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
506         if (rec.dptr == NULL) {
507                 DEBUG(0,(__location__ " Failed to alloc record\n"));
508                 talloc_free(tmp_ctx);
509                 return -1;
510         }
511         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
512         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
513
514         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
515
516         talloc_free(tmp_ctx);
517
518         return ret;
519 }
520
521
522 /* 
523    a record store inside a transaction
524  */
525 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
526 {
527         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
528                 rec->private_data, struct db_ctdb_transaction_handle);
529         int ret;
530
531         ret = db_ctdb_transaction_store(h, rec->key, data);
532         if (ret != 0) {
533                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
534         }
535         return NT_STATUS_OK;
536 }
537
538 /* 
539    a record delete inside a transaction
540  */
541 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
542 {
543         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
544                 rec->private_data, struct db_ctdb_transaction_handle);
545         int ret;
546
547         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
548         if (ret != 0) {
549                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
550         }
551         return NT_STATUS_OK;
552 }
553
554
555 /*
556   replay a transaction
557  */
558 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
559 {
560         int ret, i;
561         struct ctdb_rec_data *rec = NULL;
562
563         h->in_replay = true;
564         talloc_free(h->m_write);
565         h->m_write = NULL;
566
567         ret = db_ctdb_transaction_fetch_start(h);
568         if (ret != 0) {
569                 return ret;
570         }
571
572         for (i=0;i<h->m_all->count;i++) {
573                 TDB_DATA key, data;
574
575                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
576                 if (rec == NULL) {
577                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
578                         goto failed;
579                 }
580
581                 if (rec->reqid == 0) {
582                         /* its a store */
583                         if (db_ctdb_transaction_store(h, key, data) != 0) {
584                                 goto failed;
585                         }
586                 } else {
587                         TDB_DATA data2;
588                         TALLOC_CTX *tmp_ctx = talloc_new(h);
589
590                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
591                                 talloc_free(tmp_ctx);
592                                 goto failed;
593                         }
594                         if (data2.dsize != data.dsize ||
595                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
596                                 /* the record has changed on us - we have to give up */
597                                 talloc_free(tmp_ctx);
598                                 goto failed;
599                         }
600                         talloc_free(tmp_ctx);
601                 }
602         }
603
604         return 0;
605
606 failed:
607         tdb_transaction_cancel(h->ctx->wtdb->tdb);
608         return -1;
609 }
610
611
612 /*
613   commit a transaction
614  */
615 static int db_ctdb_transaction_commit(struct db_context *db)
616 {
617         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
618                                                         struct db_ctdb_ctx);
619         NTSTATUS rets;
620         int ret;
621         int status;
622         int retries = 0;
623         struct db_ctdb_transaction_handle *h = ctx->transaction;
624         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
625
626         if (h == NULL) {
627                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
628                 return -1;
629         }
630
631         if (h->nested_cancel) {
632                 db->transaction_cancel(db);
633                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
634                 return -1;
635         }
636
637         if (h->nesting != 0) {
638                 h->nesting--;
639                 return 0;
640         }
641
642         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
643
644         talloc_set_destructor(h, NULL);
645
646         /* our commit strategy is quite complex.
647
648            - we first try to commit the changes to all other nodes
649
650            - if that works, then we commit locally and we are done
651
652            - if a commit on another node fails, then we need to cancel
653              the transaction, then restart the transaction (thus
654              opening a window of time for a pending recovery to
655              complete), then replay the transaction, checking all the
656              reads and writes (checking that reads give the same data,
657              and writes succeed). Then we retry the transaction to the
658              other nodes
659         */
660
661 again:
662         if (h->m_write == NULL) {
663                 /* no changes were made, potentially after a retry */
664                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
665                 talloc_free(h);
666                 ctx->transaction = NULL;
667                 return 0;
668         }
669
670         /* tell ctdbd to commit to the other nodes */
671         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
672                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
673                                    h->ctx->db_id, 0,
674                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
675         if (!NT_STATUS_IS_OK(rets) || status != 0) {
676                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
677                 sleep(1);
678
679                 if (!NT_STATUS_IS_OK(rets)) {
680                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
681                 } else {
682                         /* work out what error code we will give if we 
683                            have to fail the operation */
684                         switch ((enum ctdb_trans2_commit_error)status) {
685                         case CTDB_TRANS2_COMMIT_SUCCESS:
686                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
687                         case CTDB_TRANS2_COMMIT_TIMEOUT:
688                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
689                                 break;
690                         case CTDB_TRANS2_COMMIT_ALLFAIL:
691                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
692                                 break;
693                         }
694                 }
695
696                 if (++retries == 5) {
697                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
698                                  h->ctx->db_id, retries, (unsigned)failure_control));
699                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
700                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
701                                             tdb_null, NULL, NULL, NULL);
702                         h->ctx->transaction = NULL;
703                         talloc_free(h);
704                         ctx->transaction = NULL;
705                         return -1;                      
706                 }
707
708                 if (ctdb_replay_transaction(h) != 0) {
709                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
710                                  (unsigned)failure_control));
711                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
712                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
713                                             tdb_null, NULL, NULL, NULL);
714                         h->ctx->transaction = NULL;
715                         talloc_free(h);
716                         ctx->transaction = NULL;
717                         return -1;
718                 }
719                 goto again;
720         } else {
721                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
722         }
723
724         /* do the real commit locally */
725         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
726         if (ret != 0) {
727                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
728                          (unsigned)failure_control));
729                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
730                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
731                 h->ctx->transaction = NULL;
732                 talloc_free(h);
733                 return ret;
734         }
735
736         /* tell ctdbd that we are finished with our local commit */
737         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
738                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
739                             tdb_null, NULL, NULL, NULL);
740         h->ctx->transaction = NULL;
741         talloc_free(h);
742         return 0;
743 }
744
745
746 /*
747   cancel a transaction
748  */
749 static int db_ctdb_transaction_cancel(struct db_context *db)
750 {
751         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
752                                                         struct db_ctdb_ctx);
753         struct db_ctdb_transaction_handle *h = ctx->transaction;
754
755         if (h == NULL) {
756                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
757                 return -1;
758         }
759
760         if (h->nesting != 0) {
761                 h->nesting--;
762                 h->nested_cancel = true;
763                 return 0;
764         }
765
766         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
767
768         ctx->transaction = NULL;
769         talloc_free(h);
770         return 0;
771 }
772
773
774 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
775 {
776         struct db_ctdb_rec *crec = talloc_get_type_abort(
777                 rec->private_data, struct db_ctdb_rec);
778         TDB_DATA cdata;
779         int ret;
780
781         cdata.dsize = sizeof(crec->header) + data.dsize;
782
783         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
784                 return NT_STATUS_NO_MEMORY;
785         }
786
787         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
788         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
789
790         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
791
792         SAFE_FREE(cdata.dptr);
793
794         return (ret == 0) ? NT_STATUS_OK
795                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
796 }
797
798
799
800 static NTSTATUS db_ctdb_delete(struct db_record *rec)
801 {
802         TDB_DATA data;
803
804         /*
805          * We have to store the header with empty data. TODO: Fix the
806          * tdb-level cleanup
807          */
808
809         ZERO_STRUCT(data);
810
811         return db_ctdb_store(rec, data, 0);
812
813 }
814
815 static int db_ctdb_record_destr(struct db_record* data)
816 {
817         struct db_ctdb_rec *crec = talloc_get_type_abort(
818                 data->private_data, struct db_ctdb_rec);
819
820         DEBUG(10, (DEBUGLEVEL > 10
821                    ? "Unlocking db %u key %s\n"
822                    : "Unlocking db %u key %.20s\n",
823                    (int)crec->ctdb_ctx->db_id,
824                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
825                               data->key.dsize)));
826
827         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
828                 DEBUG(0, ("tdb_chainunlock failed\n"));
829                 return -1;
830         }
831
832         return 0;
833 }
834
835 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
836                                                TALLOC_CTX *mem_ctx,
837                                                TDB_DATA key,
838                                                bool persistent)
839 {
840         struct db_record *result;
841         struct db_ctdb_rec *crec;
842         NTSTATUS status;
843         TDB_DATA ctdb_data;
844         int migrate_attempts = 0;
845
846         if (!(result = talloc(mem_ctx, struct db_record))) {
847                 DEBUG(0, ("talloc failed\n"));
848                 return NULL;
849         }
850
851         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
852                 DEBUG(0, ("talloc failed\n"));
853                 TALLOC_FREE(result);
854                 return NULL;
855         }
856
857         result->private_data = (void *)crec;
858         crec->ctdb_ctx = ctx;
859
860         result->key.dsize = key.dsize;
861         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
862         if (result->key.dptr == NULL) {
863                 DEBUG(0, ("talloc failed\n"));
864                 TALLOC_FREE(result);
865                 return NULL;
866         }
867
868         /*
869          * Do a blocking lock on the record
870          */
871 again:
872
873         if (DEBUGLEVEL >= 10) {
874                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
875                 DEBUG(10, (DEBUGLEVEL > 10
876                            ? "Locking db %u key %s\n"
877                            : "Locking db %u key %.20s\n",
878                            (int)crec->ctdb_ctx->db_id, keystr));
879                 TALLOC_FREE(keystr);
880         }
881
882         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
883                 DEBUG(3, ("tdb_chainlock failed\n"));
884                 TALLOC_FREE(result);
885                 return NULL;
886         }
887
888         result->store = db_ctdb_store;
889         result->delete_rec = db_ctdb_delete;
890         talloc_set_destructor(result, db_ctdb_record_destr);
891
892         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
893
894         /*
895          * See if we have a valid record and we are the dmaster. If so, we can
896          * take the shortcut and just return it.
897          */
898
899         if ((ctdb_data.dptr == NULL) ||
900             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
901             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
902 #if 0
903             || (random() % 2 != 0)
904 #endif
905 ) {
906                 SAFE_FREE(ctdb_data.dptr);
907                 tdb_chainunlock(ctx->wtdb->tdb, key);
908                 talloc_set_destructor(result, NULL);
909
910                 migrate_attempts += 1;
911
912                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
913                            ctdb_data.dptr, ctdb_data.dptr ?
914                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
915                            get_my_vnn()));
916
917                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
918                 if (!NT_STATUS_IS_OK(status)) {
919                         DEBUG(5, ("ctdb_migrate failed: %s\n",
920                                   nt_errstr(status)));
921                         TALLOC_FREE(result);
922                         return NULL;
923                 }
924                 /* now its migrated, try again */
925                 goto again;
926         }
927
928         if (migrate_attempts > 10) {
929                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
930                           migrate_attempts));
931         }
932
933         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
934
935         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
936         result->value.dptr = NULL;
937
938         if ((result->value.dsize != 0)
939             && !(result->value.dptr = (uint8 *)talloc_memdup(
940                          result, ctdb_data.dptr + sizeof(crec->header),
941                          result->value.dsize))) {
942                 DEBUG(0, ("talloc failed\n"));
943                 TALLOC_FREE(result);
944         }
945
946         SAFE_FREE(ctdb_data.dptr);
947
948         return result;
949 }
950
951 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
952                                               TALLOC_CTX *mem_ctx,
953                                               TDB_DATA key)
954 {
955         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
956                                                         struct db_ctdb_ctx);
957
958         if (ctx->transaction != NULL) {
959                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
960         }
961
962         if (db->persistent) {
963                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
964         }
965
966         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
967 }
968
969 /*
970   fetch (unlocked, no migration) operation on ctdb
971  */
972 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
973                          TDB_DATA key, TDB_DATA *data)
974 {
975         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
976                                                         struct db_ctdb_ctx);
977         NTSTATUS status;
978         TDB_DATA ctdb_data;
979
980         if (ctx->transaction) {
981                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
982         }
983
984         /* try a direct fetch */
985         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
986
987         /*
988          * See if we have a valid record and we are the dmaster. If so, we can
989          * take the shortcut and just return it.
990          * we bypass the dmaster check for persistent databases
991          */
992         if ((ctdb_data.dptr != NULL) &&
993             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
994             (db->persistent ||
995              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
996                 /* we are the dmaster - avoid the ctdb protocol op */
997
998                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
999                 if (data->dsize == 0) {
1000                         SAFE_FREE(ctdb_data.dptr);
1001                         data->dptr = NULL;
1002                         return 0;
1003                 }
1004
1005                 data->dptr = (uint8 *)talloc_memdup(
1006                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1007                         data->dsize);
1008
1009                 SAFE_FREE(ctdb_data.dptr);
1010
1011                 if (data->dptr == NULL) {
1012                         return -1;
1013                 }
1014                 return 0;
1015         }
1016
1017         SAFE_FREE(ctdb_data.dptr);
1018
1019         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1020         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1021         if (!NT_STATUS_IS_OK(status)) {
1022                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1023                 return -1;
1024         }
1025
1026         return 0;
1027 }
1028
1029 struct traverse_state {
1030         struct db_context *db;
1031         int (*fn)(struct db_record *rec, void *private_data);
1032         void *private_data;
1033 };
1034
1035 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1036 {
1037         struct traverse_state *state = (struct traverse_state *)private_data;
1038         struct db_record *rec;
1039         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1040         /* we have to give them a locked record to prevent races */
1041         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1042         if (rec && rec->value.dsize > 0) {
1043                 state->fn(rec, state->private_data);
1044         }
1045         talloc_free(tmp_ctx);
1046 }
1047
1048 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1049                                         void *private_data)
1050 {
1051         struct traverse_state *state = (struct traverse_state *)private_data;
1052         struct db_record *rec;
1053         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1054         int ret = 0;
1055         /* we have to give them a locked record to prevent races */
1056         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1057         if (rec && rec->value.dsize > 0) {
1058                 ret = state->fn(rec, state->private_data);
1059         }
1060         talloc_free(tmp_ctx);
1061         return ret;
1062 }
1063
1064 static int db_ctdb_traverse(struct db_context *db,
1065                             int (*fn)(struct db_record *rec,
1066                                       void *private_data),
1067                             void *private_data)
1068 {
1069         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1070                                                         struct db_ctdb_ctx);
1071         struct traverse_state state;
1072
1073         state.db = db;
1074         state.fn = fn;
1075         state.private_data = private_data;
1076
1077         if (db->persistent) {
1078                 /* for persistent databases we don't need to do a ctdb traverse,
1079                    we can do a faster local traverse */
1080                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1081         }
1082
1083
1084         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1085         return 0;
1086 }
1087
1088 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1089 {
1090         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1091 }
1092
1093 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1094 {
1095         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1096 }
1097
1098 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1099 {
1100         struct traverse_state *state = (struct traverse_state *)private_data;
1101         struct db_record rec;
1102         rec.key = key;
1103         rec.value = data;
1104         rec.store = db_ctdb_store_deny;
1105         rec.delete_rec = db_ctdb_delete_deny;
1106         rec.private_data = state->db;
1107         state->fn(&rec, state->private_data);
1108 }
1109
1110 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1111                                         void *private_data)
1112 {
1113         struct traverse_state *state = (struct traverse_state *)private_data;
1114         struct db_record rec;
1115         rec.key = kbuf;
1116         rec.value = dbuf;
1117         rec.store = db_ctdb_store_deny;
1118         rec.delete_rec = db_ctdb_delete_deny;
1119         rec.private_data = state->db;
1120
1121         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1122                 /* a deleted record */
1123                 return 0;
1124         }
1125         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1126         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1127
1128         return state->fn(&rec, state->private_data);
1129 }
1130
1131 static int db_ctdb_traverse_read(struct db_context *db,
1132                                  int (*fn)(struct db_record *rec,
1133                                            void *private_data),
1134                                  void *private_data)
1135 {
1136         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1137                                                         struct db_ctdb_ctx);
1138         struct traverse_state state;
1139
1140         state.db = db;
1141         state.fn = fn;
1142         state.private_data = private_data;
1143
1144         if (db->persistent) {
1145                 /* for persistent databases we don't need to do a ctdb traverse,
1146                    we can do a faster local traverse */
1147                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1148         }
1149
1150         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1151         return 0;
1152 }
1153
1154 static int db_ctdb_get_seqnum(struct db_context *db)
1155 {
1156         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1157                                                         struct db_ctdb_ctx);
1158         return tdb_get_seqnum(ctx->wtdb->tdb);
1159 }
1160
1161 static int db_ctdb_get_flags(struct db_context *db)
1162 {
1163         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1164                                                         struct db_ctdb_ctx);
1165         return tdb_get_flags(ctx->wtdb->tdb);
1166 }
1167
1168 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1169                                 const char *name,
1170                                 int hash_size, int tdb_flags,
1171                                 int open_flags, mode_t mode)
1172 {
1173         struct db_context *result;
1174         struct db_ctdb_ctx *db_ctdb;
1175         char *db_path;
1176
1177         if (!lp_clustering()) {
1178                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1179                 return NULL;
1180         }
1181
1182         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1183                 DEBUG(0, ("talloc failed\n"));
1184                 TALLOC_FREE(result);
1185                 return NULL;
1186         }
1187
1188         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1189                 DEBUG(0, ("talloc failed\n"));
1190                 TALLOC_FREE(result);
1191                 return NULL;
1192         }
1193
1194         db_ctdb->transaction = NULL;
1195         db_ctdb->db = result;
1196
1197         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1198                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1199                 TALLOC_FREE(result);
1200                 return NULL;
1201         }
1202
1203         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1204
1205         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1206
1207         /* only pass through specific flags */
1208         tdb_flags &= TDB_SEQNUM;
1209
1210         /* honor permissions if user has specified O_CREAT */
1211         if (open_flags & O_CREAT) {
1212                 chmod(db_path, mode);
1213         }
1214
1215         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1216         if (db_ctdb->wtdb == NULL) {
1217                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1218                 TALLOC_FREE(result);
1219                 return NULL;
1220         }
1221         talloc_free(db_path);
1222
1223         result->private_data = (void *)db_ctdb;
1224         result->fetch_locked = db_ctdb_fetch_locked;
1225         result->fetch = db_ctdb_fetch;
1226         result->traverse = db_ctdb_traverse;
1227         result->traverse_read = db_ctdb_traverse_read;
1228         result->get_seqnum = db_ctdb_get_seqnum;
1229         result->get_flags = db_ctdb_get_flags;
1230         result->transaction_start = db_ctdb_transaction_start;
1231         result->transaction_commit = db_ctdb_transaction_commit;
1232         result->transaction_cancel = db_ctdb_transaction_cancel;
1233
1234         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1235                  name, db_ctdb->db_id));
1236
1237         return result;
1238 }
1239 #endif