fixed a segfault on the ctdb destructor code
[ira/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34         uint32_t nesting;
35         bool nested_cancel;
36 };
37
38 struct db_ctdb_ctx {
39         struct db_context *db;
40         struct tdb_wrap *wtdb;
41         uint32 db_id;
42         struct db_ctdb_transaction_handle *transaction;
43 };
44
45 struct db_ctdb_rec {
46         struct db_ctdb_ctx *ctdb_ctx;
47         struct ctdb_ltdb_header header;
48 };
49
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51                                                TALLOC_CTX *mem_ctx,
52                                                TDB_DATA key,
53                                                bool persistent);
54
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
56 {
57         NTSTATUS status;
58         enum TDB_ERROR tret = tdb_error(tdb);
59
60         switch (tret) {
61         case TDB_ERR_EXISTS:
62                 status = NT_STATUS_OBJECT_NAME_COLLISION;
63                 break;
64         case TDB_ERR_NOEXIST:
65                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66                 break;
67         default:
68                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69                 break;
70         }
71
72         return status;
73 }
74
75
76
77 /*
78   form a ctdb_rec_data record from a key/data pair
79
80   note that header may be NULL. If not NULL then it is included in the data portion
81   of the record
82  */
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
84                                                   TDB_DATA key, 
85                                                   struct ctdb_ltdb_header *header,
86                                                   TDB_DATA data)
87 {
88         size_t length;
89         struct ctdb_rec_data *d;
90
91         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
92                 data.dsize + (header?sizeof(*header):0);
93         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94         if (d == NULL) {
95                 return NULL;
96         }
97         d->length = length;
98         d->reqid = reqid;
99         d->keylen = key.dsize;
100         memcpy(&d->data[0], key.dptr, key.dsize);
101         if (header) {
102                 d->datalen = data.dsize + sizeof(*header);
103                 memcpy(&d->data[key.dsize], header, sizeof(*header));
104                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105         } else {
106                 d->datalen = data.dsize;
107                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
108         }
109         return d;
110 }
111
112
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
115                                                struct ctdb_marshall_buffer *m,
116                                                uint64_t db_id,
117                                                uint32_t reqid,
118                                                TDB_DATA key,
119                                                struct ctdb_ltdb_header *header,
120                                                TDB_DATA data)
121 {
122         struct ctdb_rec_data *r;
123         size_t m_size, r_size;
124         struct ctdb_marshall_buffer *m2;
125
126         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
127         if (r == NULL) {
128                 talloc_free(m);
129                 return NULL;
130         }
131
132         if (m == NULL) {
133                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
135                 if (m == NULL) {
136                         return NULL;
137                 }
138                 m->db_id = db_id;
139         }
140
141         m_size = talloc_get_size(m);
142         r_size = talloc_get_size(r);
143
144         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145                 mem_ctx, m,  m_size + r_size);
146         if (m2 == NULL) {
147                 talloc_free(m);
148                 return NULL;
149         }
150
151         memcpy(m_size + (uint8_t *)m2, r, r_size);
152
153         talloc_free(r);
154
155         m2->count++;
156
157         return m2;
158 }
159
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
162 {
163         TDB_DATA data;
164         data.dptr = (uint8_t *)m;
165         data.dsize = talloc_get_size(m);
166         return data;
167 }
168
169 /* 
170    loop over a marshalling buffer 
171
172      - pass r==NULL to start
173      - loop the number of times indicated by m->count
174 */
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
176                                                      uint32_t *reqid,
177                                                      struct ctdb_ltdb_header *header,
178                                                      TDB_DATA *key, TDB_DATA *data)
179 {
180         if (r == NULL) {
181                 r = (struct ctdb_rec_data *)&m->data[0];
182         } else {
183                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
184         }
185
186         if (reqid != NULL) {
187                 *reqid = r->reqid;
188         }
189
190         if (key != NULL) {
191                 key->dptr   = &r->data[0];
192                 key->dsize  = r->keylen;
193         }
194         if (data != NULL) {
195                 data->dptr  = &r->data[r->keylen];
196                 data->dsize = r->datalen;
197                 if (header != NULL) {
198                         data->dptr += sizeof(*header);
199                         data->dsize -= sizeof(*header);
200                 }
201         }
202
203         if (header != NULL) {
204                 if (r->datalen < sizeof(*header)) {
205                         return NULL;
206                 }
207                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
208         }
209
210         return r;
211 }
212
213
214
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
217 {
218         tdb_transaction_cancel(h->ctx->wtdb->tdb);
219         return 0;
220 }
221
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
224 {
225         struct db_record *rh;
226         TDB_DATA key;
227         TALLOC_CTX *tmp_ctx;
228         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
229         int ret;
230         struct db_ctdb_ctx *ctx = h->ctx;
231         TDB_DATA data;
232
233         key.dptr = (uint8_t *)discard_const(keyname);
234         key.dsize = strlen(keyname);
235
236 again:
237         tmp_ctx = talloc_new(h);
238
239         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
240         if (rh == NULL) {
241                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245         talloc_free(rh);
246
247         ret = tdb_transaction_start(ctx->wtdb->tdb);
248         if (ret != 0) {
249                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
250                 talloc_free(tmp_ctx);
251                 return -1;
252         }
253
254         data = tdb_fetch(ctx->wtdb->tdb, key);
255         if ((data.dptr == NULL) ||
256             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
257             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
258                 SAFE_FREE(data.dptr);
259                 tdb_transaction_cancel(ctx->wtdb->tdb);
260                 talloc_free(tmp_ctx);
261                 goto again;
262         }
263
264         SAFE_FREE(data.dptr);
265         talloc_free(tmp_ctx);
266
267         return 0;
268 }
269
270
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context *db)
273 {
274         struct db_ctdb_transaction_handle *h;
275         int ret;
276         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
277                                                         struct db_ctdb_ctx);
278
279         if (!db->persistent) {
280                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
281                          ctx->db_id));
282                 return -1;
283         }
284
285         if (ctx->transaction) {
286                 ctx->transaction->nesting++;
287                 return 0;
288         }
289
290         h = talloc_zero(db, struct db_ctdb_transaction_handle);
291         if (h == NULL) {
292                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
293                 return -1;
294         }
295
296         h->ctx = ctx;
297
298         ret = db_ctdb_transaction_fetch_start(h);
299         if (ret != 0) {
300                 talloc_free(h);
301                 return -1;
302         }
303
304         talloc_set_destructor(h, db_ctdb_transaction_destructor);
305
306         ctx->transaction = h;
307
308         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
309
310         return 0;
311 }
312
313
314
315 /*
316   fetch a record inside a transaction
317  */
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
319                                      TALLOC_CTX *mem_ctx, 
320                                      TDB_DATA key, TDB_DATA *data)
321 {
322         struct db_ctdb_transaction_handle *h = db->transaction;
323
324         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
325
326         if (data->dptr != NULL) {
327                 uint8_t *oldptr = (uint8_t *)data->dptr;
328                 data->dsize -= sizeof(struct ctdb_ltdb_header);
329                 if (data->dsize == 0) {
330                         data->dptr = NULL;
331                 } else {
332                         data->dptr = (uint8 *)
333                                 talloc_memdup(
334                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
335                                         data->dsize);
336                 }
337                 SAFE_FREE(oldptr);
338                 if (data->dptr == NULL && data->dsize != 0) {
339                         return -1;
340                 }
341         }
342
343         if (!h->in_replay) {
344                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
345                 if (h->m_all == NULL) {
346                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
347                         data->dsize = 0;
348                         talloc_free(data->dptr);
349                         return -1;
350                 }
351         }
352
353         return 0;
354 }
355
356
357 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
358 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
359
360 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
361                                                           TALLOC_CTX *mem_ctx,
362                                                           TDB_DATA key)
363 {
364         struct db_record *result;
365         TDB_DATA ctdb_data;
366
367         if (!(result = talloc(mem_ctx, struct db_record))) {
368                 DEBUG(0, ("talloc failed\n"));
369                 return NULL;
370         }
371
372         result->private_data = ctx->transaction;
373
374         result->key.dsize = key.dsize;
375         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
376         if (result->key.dptr == NULL) {
377                 DEBUG(0, ("talloc failed\n"));
378                 TALLOC_FREE(result);
379                 return NULL;
380         }
381
382         result->store = db_ctdb_store_transaction;
383         result->delete_rec = db_ctdb_delete_transaction;
384
385         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
386         if (ctdb_data.dptr == NULL) {
387                 /* create the record */
388                 result->value = tdb_null;
389                 return result;
390         }
391
392         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
393         result->value.dptr = NULL;
394
395         if ((result->value.dsize != 0)
396             && !(result->value.dptr = (uint8 *)talloc_memdup(
397                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
398                          result->value.dsize))) {
399                 DEBUG(0, ("talloc failed\n"));
400                 TALLOC_FREE(result);
401         }
402
403         SAFE_FREE(ctdb_data.dptr);
404
405         return result;
406 }
407
408 static int db_ctdb_record_destructor(struct db_record **recp)
409 {
410         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
411         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
412                 rec->private_data, struct db_ctdb_transaction_handle);
413         int ret = h->ctx->db->transaction_commit(h->ctx->db);
414         if (ret != 0) {
415                 DEBUG(0,(__location__ " transaction_commit failed\n"));
416         }
417         return 0;
418 }
419
420 /*
421   auto-create a transaction for persistent databases
422  */
423 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
424                                                          TALLOC_CTX *mem_ctx,
425                                                          TDB_DATA key)
426 {
427         int res;
428         struct db_record *rec, **recp;
429
430         res = db_ctdb_transaction_start(ctx->db);
431         if (res == -1) {
432                 return NULL;
433         }
434
435         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
436         if (rec == NULL) {
437                 ctx->db->transaction_cancel(ctx->db);           
438                 return NULL;
439         }
440
441         /* destroy this transaction when we release the lock */
442         recp = talloc(rec, struct db_record *);
443         if (recp == NULL) {
444                 ctx->db->transaction_cancel(ctx->db);
445                 return NULL;
446         }
447         *recp = rec;
448         talloc_set_destructor(recp, db_ctdb_record_destructor);
449         return rec;
450 }
451
452
453 /*
454   stores a record inside a transaction
455  */
456 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
457                                      TDB_DATA key, TDB_DATA data)
458 {
459         TALLOC_CTX *tmp_ctx = talloc_new(h);
460         int ret;
461         TDB_DATA rec;
462         struct ctdb_ltdb_header header;
463
464         /* we need the header so we can update the RSN */
465         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
466         if (rec.dptr == NULL) {
467                 /* the record doesn't exist - create one with us as dmaster.
468                    This is only safe because we are in a transaction and this
469                    is a persistent database */
470                 ZERO_STRUCT(header);
471                 header.dmaster = get_my_vnn();
472         } else {
473                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
474                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
475                 /* a special case, we are writing the same data that is there now */
476                 if (data.dsize == rec.dsize &&
477                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
478                         SAFE_FREE(rec.dptr);
479                         talloc_free(tmp_ctx);
480                         return 0;
481                 }
482                 SAFE_FREE(rec.dptr);
483         }
484
485         header.rsn++;
486
487         if (!h->in_replay) {
488                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
489                 if (h->m_all == NULL) {
490                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
491                         talloc_free(tmp_ctx);
492                         return -1;
493                 }
494         }
495
496         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
497         if (h->m_write == NULL) {
498                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
499                 talloc_free(tmp_ctx);
500                 return -1;
501         }
502
503         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
504         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
505         if (rec.dptr == NULL) {
506                 DEBUG(0,(__location__ " Failed to alloc record\n"));
507                 talloc_free(tmp_ctx);
508                 return -1;
509         }
510         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
511         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
512
513         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
514
515         talloc_free(tmp_ctx);
516
517         return ret;
518 }
519
520
521 /* 
522    a record store inside a transaction
523  */
524 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
525 {
526         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
527                 rec->private_data, struct db_ctdb_transaction_handle);
528         int ret;
529
530         ret = db_ctdb_transaction_store(h, rec->key, data);
531         if (ret != 0) {
532                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
533         }
534         return NT_STATUS_OK;
535 }
536
537 /* 
538    a record delete inside a transaction
539  */
540 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
541 {
542         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
543                 rec->private_data, struct db_ctdb_transaction_handle);
544         int ret;
545
546         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
547         if (ret != 0) {
548                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
549         }
550         return NT_STATUS_OK;
551 }
552
553
554 /*
555   replay a transaction
556  */
557 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
558 {
559         int ret, i;
560         struct ctdb_rec_data *rec = NULL;
561
562         h->in_replay = true;
563         talloc_free(h->m_write);
564         h->m_write = NULL;
565
566         ret = db_ctdb_transaction_fetch_start(h);
567         if (ret != 0) {
568                 return ret;
569         }
570
571         for (i=0;i<h->m_all->count;i++) {
572                 TDB_DATA key, data;
573
574                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
575                 if (rec == NULL) {
576                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
577                         goto failed;
578                 }
579
580                 if (rec->reqid == 0) {
581                         /* its a store */
582                         if (db_ctdb_transaction_store(h, key, data) != 0) {
583                                 goto failed;
584                         }
585                 } else {
586                         TDB_DATA data2;
587                         TALLOC_CTX *tmp_ctx = talloc_new(h);
588
589                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
590                                 talloc_free(tmp_ctx);
591                                 goto failed;
592                         }
593                         if (data2.dsize != data.dsize ||
594                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
595                                 /* the record has changed on us - we have to give up */
596                                 talloc_free(tmp_ctx);
597                                 goto failed;
598                         }
599                         talloc_free(tmp_ctx);
600                 }
601         }
602
603         return 0;
604
605 failed:
606         tdb_transaction_cancel(h->ctx->wtdb->tdb);
607         return -1;
608 }
609
610
611 /*
612   commit a transaction
613  */
614 static int db_ctdb_transaction_commit(struct db_context *db)
615 {
616         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
617                                                         struct db_ctdb_ctx);
618         NTSTATUS rets;
619         int ret;
620         int status;
621         int retries = 0;
622         struct db_ctdb_transaction_handle *h = ctx->transaction;
623         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
624
625         if (h == NULL) {
626                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
627                 return -1;
628         }
629
630         if (h->nested_cancel) {
631                 db->transaction_cancel(db);
632                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
633                 return -1;
634         }
635
636         if (h->nesting != 0) {
637                 h->nesting--;
638                 return 0;
639         }
640
641         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
642
643         talloc_set_destructor(h, NULL);
644
645         /* our commit strategy is quite complex.
646
647            - we first try to commit the changes to all other nodes
648
649            - if that works, then we commit locally and we are done
650
651            - if a commit on another node fails, then we need to cancel
652              the transaction, then restart the transaction (thus
653              opening a window of time for a pending recovery to
654              complete), then replay the transaction, checking all the
655              reads and writes (checking that reads give the same data,
656              and writes succeed). Then we retry the transaction to the
657              other nodes
658         */
659
660 again:
661         if (h->m_write == NULL) {
662                 /* no changes were made, potentially after a retry */
663                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
664                 talloc_free(h);
665                 ctx->transaction = NULL;
666                 return 0;
667         }
668
669         /* tell ctdbd to commit to the other nodes */
670         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
671                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
672                                    h->ctx->db_id, 0,
673                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
674         if (!NT_STATUS_IS_OK(rets) || status != 0) {
675                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
676                 sleep(1);
677
678                 if (!NT_STATUS_IS_OK(rets)) {
679                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
680                 } else {
681                         /* work out what error code we will give if we 
682                            have to fail the operation */
683                         switch ((enum ctdb_trans2_commit_error)status) {
684                         case CTDB_TRANS2_COMMIT_SUCCESS:
685                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
686                         case CTDB_TRANS2_COMMIT_TIMEOUT:
687                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
688                                 break;
689                         case CTDB_TRANS2_COMMIT_ALLFAIL:
690                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
691                                 break;
692                         }
693                 }
694
695                 if (++retries == 5) {
696                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
697                                  h->ctx->db_id, retries, (unsigned)failure_control));
698                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
699                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
700                                             tdb_null, NULL, NULL, NULL);
701                         h->ctx->transaction = NULL;
702                         talloc_free(h);
703                         ctx->transaction = NULL;
704                         return -1;                      
705                 }
706
707                 if (ctdb_replay_transaction(h) != 0) {
708                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
709                                  (unsigned)failure_control));
710                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
711                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
712                                             tdb_null, NULL, NULL, NULL);
713                         h->ctx->transaction = NULL;
714                         talloc_free(h);
715                         ctx->transaction = NULL;
716                         return -1;
717                 }
718                 goto again;
719         } else {
720                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
721         }
722
723         /* do the real commit locally */
724         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
725         if (ret != 0) {
726                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
727                          (unsigned)failure_control));
728                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
729                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
730                 h->ctx->transaction = NULL;
731                 talloc_free(h);
732                 return ret;
733         }
734
735         /* tell ctdbd that we are finished with our local commit */
736         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
737                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
738                             tdb_null, NULL, NULL, NULL);
739         h->ctx->transaction = NULL;
740         talloc_free(h);
741         return 0;
742 }
743
744
745 /*
746   cancel a transaction
747  */
748 static int db_ctdb_transaction_cancel(struct db_context *db)
749 {
750         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
751                                                         struct db_ctdb_ctx);
752         struct db_ctdb_transaction_handle *h = ctx->transaction;
753
754         if (h == NULL) {
755                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
756                 return -1;
757         }
758
759         if (h->nesting != 0) {
760                 h->nesting--;
761                 h->nested_cancel = true;
762                 return 0;
763         }
764
765         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
766
767         ctx->transaction = NULL;
768         talloc_free(h);
769         return 0;
770 }
771
772
773 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
774 {
775         struct db_ctdb_rec *crec = talloc_get_type_abort(
776                 rec->private_data, struct db_ctdb_rec);
777         TDB_DATA cdata;
778         int ret;
779
780         cdata.dsize = sizeof(crec->header) + data.dsize;
781
782         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
783                 return NT_STATUS_NO_MEMORY;
784         }
785
786         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
787         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
788
789         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
790
791         SAFE_FREE(cdata.dptr);
792
793         return (ret == 0) ? NT_STATUS_OK
794                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
795 }
796
797
798
799 static NTSTATUS db_ctdb_delete(struct db_record *rec)
800 {
801         TDB_DATA data;
802
803         /*
804          * We have to store the header with empty data. TODO: Fix the
805          * tdb-level cleanup
806          */
807
808         ZERO_STRUCT(data);
809
810         return db_ctdb_store(rec, data, 0);
811
812 }
813
814 static int db_ctdb_record_destr(struct db_record* data)
815 {
816         struct db_ctdb_rec *crec = talloc_get_type_abort(
817                 data->private_data, struct db_ctdb_rec);
818
819         DEBUG(10, (DEBUGLEVEL > 10
820                    ? "Unlocking db %u key %s\n"
821                    : "Unlocking db %u key %.20s\n",
822                    (int)crec->ctdb_ctx->db_id,
823                    hex_encode(data, (unsigned char *)data->key.dptr,
824                               data->key.dsize)));
825
826         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
827                 DEBUG(0, ("tdb_chainunlock failed\n"));
828                 return -1;
829         }
830
831         return 0;
832 }
833
834 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
835                                                TALLOC_CTX *mem_ctx,
836                                                TDB_DATA key,
837                                                bool persistent)
838 {
839         struct db_record *result;
840         struct db_ctdb_rec *crec;
841         NTSTATUS status;
842         TDB_DATA ctdb_data;
843         int migrate_attempts = 0;
844
845         if (!(result = talloc(mem_ctx, struct db_record))) {
846                 DEBUG(0, ("talloc failed\n"));
847                 return NULL;
848         }
849
850         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
851                 DEBUG(0, ("talloc failed\n"));
852                 TALLOC_FREE(result);
853                 return NULL;
854         }
855
856         result->private_data = (void *)crec;
857         crec->ctdb_ctx = ctx;
858
859         result->key.dsize = key.dsize;
860         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
861         if (result->key.dptr == NULL) {
862                 DEBUG(0, ("talloc failed\n"));
863                 TALLOC_FREE(result);
864                 return NULL;
865         }
866
867         /*
868          * Do a blocking lock on the record
869          */
870 again:
871
872         if (DEBUGLEVEL >= 10) {
873                 char *keystr = hex_encode(result, key.dptr, key.dsize);
874                 DEBUG(10, (DEBUGLEVEL > 10
875                            ? "Locking db %u key %s\n"
876                            : "Locking db %u key %.20s\n",
877                            (int)crec->ctdb_ctx->db_id, keystr));
878                 TALLOC_FREE(keystr);
879         }
880
881         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
882                 DEBUG(3, ("tdb_chainlock failed\n"));
883                 TALLOC_FREE(result);
884                 return NULL;
885         }
886
887         result->store = db_ctdb_store;
888         result->delete_rec = db_ctdb_delete;
889         talloc_set_destructor(result, db_ctdb_record_destr);
890
891         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
892
893         /*
894          * See if we have a valid record and we are the dmaster. If so, we can
895          * take the shortcut and just return it.
896          */
897
898         if ((ctdb_data.dptr == NULL) ||
899             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
900             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
901 #if 0
902             || (random() % 2 != 0)
903 #endif
904 ) {
905                 SAFE_FREE(ctdb_data.dptr);
906                 tdb_chainunlock(ctx->wtdb->tdb, key);
907                 talloc_set_destructor(result, NULL);
908
909                 migrate_attempts += 1;
910
911                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
912                            ctdb_data.dptr, ctdb_data.dptr ?
913                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
914                            get_my_vnn()));
915
916                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
917                 if (!NT_STATUS_IS_OK(status)) {
918                         DEBUG(5, ("ctdb_migrate failed: %s\n",
919                                   nt_errstr(status)));
920                         TALLOC_FREE(result);
921                         return NULL;
922                 }
923                 /* now its migrated, try again */
924                 goto again;
925         }
926
927         if (migrate_attempts > 10) {
928                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
929                           migrate_attempts));
930         }
931
932         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
933
934         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
935         result->value.dptr = NULL;
936
937         if ((result->value.dsize != 0)
938             && !(result->value.dptr = (uint8 *)talloc_memdup(
939                          result, ctdb_data.dptr + sizeof(crec->header),
940                          result->value.dsize))) {
941                 DEBUG(0, ("talloc failed\n"));
942                 TALLOC_FREE(result);
943         }
944
945         SAFE_FREE(ctdb_data.dptr);
946
947         return result;
948 }
949
950 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
951                                               TALLOC_CTX *mem_ctx,
952                                               TDB_DATA key)
953 {
954         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
955                                                         struct db_ctdb_ctx);
956
957         if (ctx->transaction != NULL) {
958                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
959         }
960
961         if (db->persistent) {
962                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
963         }
964
965         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
966 }
967
968 /*
969   fetch (unlocked, no migration) operation on ctdb
970  */
971 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
972                          TDB_DATA key, TDB_DATA *data)
973 {
974         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
975                                                         struct db_ctdb_ctx);
976         NTSTATUS status;
977         TDB_DATA ctdb_data;
978
979         if (ctx->transaction) {
980                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
981         }
982
983         /* try a direct fetch */
984         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
985
986         /*
987          * See if we have a valid record and we are the dmaster. If so, we can
988          * take the shortcut and just return it.
989          * we bypass the dmaster check for persistent databases
990          */
991         if ((ctdb_data.dptr != NULL) &&
992             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
993             (db->persistent ||
994              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
995                 /* we are the dmaster - avoid the ctdb protocol op */
996
997                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
998                 if (data->dsize == 0) {
999                         SAFE_FREE(ctdb_data.dptr);
1000                         data->dptr = NULL;
1001                         return 0;
1002                 }
1003
1004                 data->dptr = (uint8 *)talloc_memdup(
1005                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1006                         data->dsize);
1007
1008                 SAFE_FREE(ctdb_data.dptr);
1009
1010                 if (data->dptr == NULL) {
1011                         return -1;
1012                 }
1013                 return 0;
1014         }
1015
1016         SAFE_FREE(ctdb_data.dptr);
1017
1018         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1019         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1020         if (!NT_STATUS_IS_OK(status)) {
1021                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1022                 return -1;
1023         }
1024
1025         return 0;
1026 }
1027
1028 struct traverse_state {
1029         struct db_context *db;
1030         int (*fn)(struct db_record *rec, void *private_data);
1031         void *private_data;
1032 };
1033
1034 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1035 {
1036         struct traverse_state *state = (struct traverse_state *)private_data;
1037         struct db_record *rec;
1038         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1039         /* we have to give them a locked record to prevent races */
1040         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1041         if (rec && rec->value.dsize > 0) {
1042                 state->fn(rec, state->private_data);
1043         }
1044         talloc_free(tmp_ctx);
1045 }
1046
1047 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1048                                         void *private_data)
1049 {
1050         struct traverse_state *state = (struct traverse_state *)private_data;
1051         struct db_record *rec;
1052         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1053         int ret = 0;
1054         /* we have to give them a locked record to prevent races */
1055         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1056         if (rec && rec->value.dsize > 0) {
1057                 ret = state->fn(rec, state->private_data);
1058         }
1059         talloc_free(tmp_ctx);
1060         return ret;
1061 }
1062
1063 static int db_ctdb_traverse(struct db_context *db,
1064                             int (*fn)(struct db_record *rec,
1065                                       void *private_data),
1066                             void *private_data)
1067 {
1068         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1069                                                         struct db_ctdb_ctx);
1070         struct traverse_state state;
1071
1072         state.db = db;
1073         state.fn = fn;
1074         state.private_data = private_data;
1075
1076         if (db->persistent) {
1077                 /* for persistent databases we don't need to do a ctdb traverse,
1078                    we can do a faster local traverse */
1079                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1080         }
1081
1082
1083         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1084         return 0;
1085 }
1086
1087 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1088 {
1089         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1090 }
1091
1092 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1093 {
1094         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1095 }
1096
1097 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1098 {
1099         struct traverse_state *state = (struct traverse_state *)private_data;
1100         struct db_record rec;
1101         rec.key = key;
1102         rec.value = data;
1103         rec.store = db_ctdb_store_deny;
1104         rec.delete_rec = db_ctdb_delete_deny;
1105         rec.private_data = state->db;
1106         state->fn(&rec, state->private_data);
1107 }
1108
1109 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1110                                         void *private_data)
1111 {
1112         struct traverse_state *state = (struct traverse_state *)private_data;
1113         struct db_record rec;
1114         rec.key = kbuf;
1115         rec.value = dbuf;
1116         rec.store = db_ctdb_store_deny;
1117         rec.delete_rec = db_ctdb_delete_deny;
1118         rec.private_data = state->db;
1119
1120         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1121                 /* a deleted record */
1122                 return 0;
1123         }
1124         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1125         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1126
1127         return state->fn(&rec, state->private_data);
1128 }
1129
1130 static int db_ctdb_traverse_read(struct db_context *db,
1131                                  int (*fn)(struct db_record *rec,
1132                                            void *private_data),
1133                                  void *private_data)
1134 {
1135         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1136                                                         struct db_ctdb_ctx);
1137         struct traverse_state state;
1138
1139         state.db = db;
1140         state.fn = fn;
1141         state.private_data = private_data;
1142
1143         if (db->persistent) {
1144                 /* for persistent databases we don't need to do a ctdb traverse,
1145                    we can do a faster local traverse */
1146                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1147         }
1148
1149         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1150         return 0;
1151 }
1152
1153 static int db_ctdb_get_seqnum(struct db_context *db)
1154 {
1155         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1156                                                         struct db_ctdb_ctx);
1157         return tdb_get_seqnum(ctx->wtdb->tdb);
1158 }
1159
1160 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1161                                 const char *name,
1162                                 int hash_size, int tdb_flags,
1163                                 int open_flags, mode_t mode)
1164 {
1165         struct db_context *result;
1166         struct db_ctdb_ctx *db_ctdb;
1167         char *db_path;
1168
1169         if (!lp_clustering()) {
1170                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1171                 return NULL;
1172         }
1173
1174         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1175                 DEBUG(0, ("talloc failed\n"));
1176                 TALLOC_FREE(result);
1177                 return NULL;
1178         }
1179
1180         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1181                 DEBUG(0, ("talloc failed\n"));
1182                 TALLOC_FREE(result);
1183                 return NULL;
1184         }
1185
1186         db_ctdb->transaction = NULL;
1187         db_ctdb->db = result;
1188
1189         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1190                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1191                 TALLOC_FREE(result);
1192                 return NULL;
1193         }
1194
1195         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1196
1197         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1198
1199         /* only pass through specific flags */
1200         tdb_flags &= TDB_SEQNUM;
1201
1202         /* honor permissions if user has specified O_CREAT */
1203         if (open_flags & O_CREAT) {
1204                 chmod(db_path, mode);
1205         }
1206
1207         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1208         if (db_ctdb->wtdb == NULL) {
1209                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1210                 TALLOC_FREE(result);
1211                 return NULL;
1212         }
1213         talloc_free(db_path);
1214
1215         result->private_data = (void *)db_ctdb;
1216         result->fetch_locked = db_ctdb_fetch_locked;
1217         result->fetch = db_ctdb_fetch;
1218         result->traverse = db_ctdb_traverse;
1219         result->traverse_read = db_ctdb_traverse_read;
1220         result->get_seqnum = db_ctdb_get_seqnum;
1221         result->transaction_start = db_ctdb_transaction_start;
1222         result->transaction_commit = db_ctdb_transaction_commit;
1223         result->transaction_cancel = db_ctdb_transaction_cancel;
1224
1225         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1226                  name, db_ctdb->db_id));
1227
1228         return result;
1229 }
1230 #endif