use CTDB_CONTROL_TRANS2_COMMIT_RETRY to prevent the counter getting
[nivanova/samba-autobuild/.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34 };
35
36 struct db_ctdb_ctx {
37         struct db_context *db;
38         struct tdb_wrap *wtdb;
39         uint32 db_id;
40         struct db_ctdb_transaction_handle *transaction;
41 };
42
43 struct db_ctdb_rec {
44         struct db_ctdb_ctx *ctdb_ctx;
45         struct ctdb_ltdb_header header;
46 };
47
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
49                                                TALLOC_CTX *mem_ctx,
50                                                TDB_DATA key,
51                                                bool persistent);
52
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
54 {
55         NTSTATUS status;
56         enum TDB_ERROR tret = tdb_error(tdb);
57
58         switch (tret) {
59         case TDB_ERR_EXISTS:
60                 status = NT_STATUS_OBJECT_NAME_COLLISION;
61                 break;
62         case TDB_ERR_NOEXIST:
63                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
64                 break;
65         default:
66                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
67                 break;
68         }
69
70         return status;
71 }
72
73
74
75 /*
76   form a ctdb_rec_data record from a key/data pair
77   
78   note that header may be NULL. If not NULL then it is included in the data portion
79   of the record
80  */
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
82                                                   TDB_DATA key, 
83                                                   struct ctdb_ltdb_header *header,
84                                                   TDB_DATA data)
85 {
86         size_t length;
87         struct ctdb_rec_data *d;
88
89         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
90                 data.dsize + (header?sizeof(*header):0);
91         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
92         if (d == NULL) {
93                 return NULL;
94         }
95         d->length = length;
96         d->reqid = reqid;
97         d->keylen = key.dsize;
98         memcpy(&d->data[0], key.dptr, key.dsize);
99         if (header) {
100                 d->datalen = data.dsize + sizeof(*header);
101                 memcpy(&d->data[key.dsize], header, sizeof(*header));
102                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
103         } else {
104                 d->datalen = data.dsize;
105                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
106         }
107         return d;
108 }
109
110
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
113                                                struct ctdb_marshall_buffer *m,
114                                                uint64_t db_id,
115                                                uint32_t reqid,
116                                                TDB_DATA key,
117                                                struct ctdb_ltdb_header *header,
118                                                TDB_DATA data)
119 {
120         struct ctdb_rec_data *r;
121         size_t m_size, r_size;
122         struct ctdb_marshall_buffer *m2;
123
124         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
125         if (r == NULL) {
126                 talloc_free(m);
127                 return NULL;
128         }
129
130         if (m == NULL) {
131                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
132                 if (m == NULL) {
133                         return NULL;
134                 }
135                 m->db_id = db_id;
136         }
137
138         m_size = talloc_get_size(m);
139         r_size = talloc_get_size(r);
140
141         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
142         if (m2 == NULL) {
143                 talloc_free(m);
144                 return NULL;
145         }
146
147         memcpy(m_size + (uint8_t *)m2, r, r_size);
148
149         talloc_free(r);
150
151         m2->count++;
152
153         return m2;
154 }
155
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
158 {
159         TDB_DATA data;
160         data.dptr = (uint8_t *)m;
161         data.dsize = talloc_get_size(m);
162         return data;
163 }
164
165 /* 
166    loop over a marshalling buffer 
167    
168      - pass r==NULL to start
169      - loop the number of times indicated by m->count
170 */
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
172                                                      uint32_t *reqid,
173                                                      struct ctdb_ltdb_header *header,
174                                                      TDB_DATA *key, TDB_DATA *data)
175 {
176         if (r == NULL) {
177                 r = (struct ctdb_rec_data *)&m->data[0];
178         } else {
179                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
180         }
181
182         if (reqid != NULL) {
183                 *reqid = r->reqid;
184         }
185         
186         if (key != NULL) {
187                 key->dptr   = &r->data[0];
188                 key->dsize  = r->keylen;
189         }
190         if (data != NULL) {
191                 data->dptr  = &r->data[r->keylen];
192                 data->dsize = r->datalen;
193                 if (header != NULL) {
194                         data->dptr += sizeof(*header);
195                         data->dsize -= sizeof(*header);
196                 }
197         }
198
199         if (header != NULL) {
200                 if (r->datalen < sizeof(*header)) {
201                         return NULL;
202                 }
203                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
204         }
205
206         return r;
207 }
208
209
210
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
213 {
214         tdb_transaction_cancel(h->ctx->wtdb->tdb);
215         return 0;
216 }
217
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
220 {
221         struct db_record *rh;
222         TDB_DATA key;
223         TALLOC_CTX *tmp_ctx;
224         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
225         int ret;
226         struct db_ctdb_ctx *ctx = h->ctx;
227         TDB_DATA data;
228
229         key.dptr = discard_const(keyname);
230         key.dsize = strlen(keyname);
231
232 again:
233         tmp_ctx = talloc_new(h);
234
235         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
236         if (rh == NULL) {
237                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
238                 talloc_free(tmp_ctx);
239                 return -1;
240         }
241         talloc_free(rh);
242
243         ret = tdb_transaction_start(ctx->wtdb->tdb);
244         if (ret != 0) {
245                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246                 talloc_free(tmp_ctx);
247                 return -1;
248         }
249
250         data = tdb_fetch(ctx->wtdb->tdb, key);
251         if ((data.dptr == NULL) ||
252             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254                 SAFE_FREE(data.dptr);
255                 tdb_transaction_cancel(ctx->wtdb->tdb);
256                 talloc_free(tmp_ctx);
257                 goto again;
258         }
259
260         SAFE_FREE(data.dptr);
261         talloc_free(tmp_ctx);
262
263         return 0;
264 }
265
266
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
269 {
270         struct db_ctdb_transaction_handle *h;
271         int ret;
272         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
273                                                         struct db_ctdb_ctx);
274
275         if (!db->persistent) {
276                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
277                          ctx->db_id));
278                 return -1;
279         }
280
281         if (ctx->transaction) {
282                 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
283                 return -1;
284         }
285
286         h = talloc_zero(db, struct db_ctdb_transaction_handle);
287         if (h == NULL) {
288                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
289                 return -1;
290         }
291
292         h->ctx = ctx;
293
294         ret = db_ctdb_transaction_fetch_start(h);
295         if (ret != 0) {
296                 talloc_free(h);
297                 return -1;
298         }
299
300         talloc_set_destructor(h, db_ctdb_transaction_destructor);
301
302         ctx->transaction = h;
303
304         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
305
306         return 0;
307 }
308
309
310
311 /*
312   fetch a record inside a transaction
313  */
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
315                                      TALLOC_CTX *mem_ctx, 
316                                      TDB_DATA key, TDB_DATA *data)
317 {
318         struct db_ctdb_transaction_handle *h = db->transaction;
319
320         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
321
322         if (data->dptr != NULL) {
323                 uint8_t *oldptr = (uint8_t *)data->dptr;
324                 data->dsize -= sizeof(struct ctdb_ltdb_header);
325                 if (data->dsize == 0) {
326                         data->dptr = NULL;
327                 } else {
328                         data->dptr = (uint8 *)
329                                 talloc_memdup(
330                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
331                                         data->dsize);
332                 }
333                 SAFE_FREE(oldptr);
334                 if (data->dptr == NULL && data->dsize != 0) {
335                         return -1;
336                 }
337         }
338
339         if (!h->in_replay) {
340                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341                 if (h->m_all == NULL) {
342                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
343                         data->dsize = 0;
344                         talloc_free(data->dptr);
345                         return -1;
346                 }
347         }
348
349         return 0;
350 }
351
352
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
355
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
357                                                           TALLOC_CTX *mem_ctx,
358                                                           TDB_DATA key)
359 {
360         struct db_record *result;
361         TDB_DATA ctdb_data;
362
363         if (!(result = talloc(mem_ctx, struct db_record))) {
364                 DEBUG(0, ("talloc failed\n"));
365                 return NULL;
366         }
367
368         result->private_data = ctx->transaction;
369
370         result->key.dsize = key.dsize;
371         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372         if (result->key.dptr == NULL) {
373                 DEBUG(0, ("talloc failed\n"));
374                 TALLOC_FREE(result);
375                 return NULL;
376         }
377
378         result->store = db_ctdb_store_transaction;
379         result->delete_rec = db_ctdb_delete_transaction;
380
381         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382         if (ctdb_data.dptr == NULL) {
383                 /* create the record */
384                 result->value = tdb_null;
385                 return result;
386         }
387
388         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389         result->value.dptr = NULL;
390
391         if ((result->value.dsize != 0)
392             && !(result->value.dptr = (uint8 *)talloc_memdup(
393                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394                          result->value.dsize))) {
395                 DEBUG(0, ("talloc failed\n"));
396                 TALLOC_FREE(result);
397         }
398
399         SAFE_FREE(ctdb_data.dptr);
400
401         return result;
402 }
403
404 static int db_ctdb_record_destructor(struct db_record *rec)
405 {
406         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407                 rec->private_data, struct db_ctdb_transaction_handle);
408         int ret = h->ctx->db->transaction_commit(h->ctx->db);
409         if (ret != 0) {
410                 DEBUG(0,(__location__ " transaction_commit failed\n"));
411         }
412         return 0;
413 }
414
415 /*
416   auto-create a transaction for persistent databases
417  */
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
419                                                          TALLOC_CTX *mem_ctx,
420                                                          TDB_DATA key)
421 {
422         int res;
423         struct db_record *rec;
424
425         res = db_ctdb_transaction_start(ctx->db);
426         if (res == -1) {
427                 return NULL;
428         }
429
430         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
431         if (rec == NULL) {
432                 ctx->db->transaction_cancel(ctx->db);           
433                 return NULL;
434         }
435
436         /* destroy this transaction when we release the lock */
437         talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
438         return rec;
439 }
440
441
442 /*
443   stores a record inside a transaction
444  */
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
446                                      TDB_DATA key, TDB_DATA data)
447 {
448         TALLOC_CTX *tmp_ctx = talloc_new(h);
449         int ret;
450         TDB_DATA rec;
451         struct ctdb_ltdb_header header;
452
453         /* we need the header so we can update the RSN */
454         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455         if (rec.dptr == NULL) {
456                 /* the record doesn't exist - create one with us as dmaster.
457                    This is only safe because we are in a transaction and this
458                    is a persistent database */
459                 ZERO_STRUCT(header);
460                 header.dmaster = get_my_vnn();
461         } else {
462                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
463                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
464                 /* a special case, we are writing the same data that is there now */
465                 if (data.dsize == rec.dsize &&
466                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
467                         SAFE_FREE(rec.dptr);
468                         talloc_free(tmp_ctx);
469                         return 0;
470                 }
471                 SAFE_FREE(rec.dptr);
472         }
473
474         header.rsn++;
475
476         if (!h->in_replay) {
477                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
478                 if (h->m_all == NULL) {
479                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
480                         talloc_free(tmp_ctx);
481                         return -1;
482                 }
483         }
484                 
485         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
486         if (h->m_write == NULL) {
487                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
488                 talloc_free(tmp_ctx);
489                 return -1;
490         }
491         
492         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
493         rec.dptr = talloc_size(tmp_ctx, rec.dsize);
494         if (rec.dptr == NULL) {
495                 DEBUG(0,(__location__ " Failed to alloc record\n"));
496                 talloc_free(tmp_ctx);
497                 return -1;
498         }
499         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
500         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
501
502         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
503
504         talloc_free(tmp_ctx);
505         
506         return ret;
507 }
508
509
510 /* 
511    a record store inside a transaction
512  */
513 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
514 {
515         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
516                 rec->private_data, struct db_ctdb_transaction_handle);
517         int ret;
518
519         ret = db_ctdb_transaction_store(h, rec->key, data);
520         if (ret != 0) {
521                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
522         }
523         return NT_STATUS_OK;
524 }
525
526 /* 
527    a record delete inside a transaction
528  */
529 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
530 {
531         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
532                 rec->private_data, struct db_ctdb_transaction_handle);
533         int ret;
534
535         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
536         if (ret != 0) {
537                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
538         }
539         return NT_STATUS_OK;
540 }
541
542
543 /*
544   replay a transaction
545  */
546 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
547 {
548         int ret, i;
549         struct ctdb_rec_data *rec = NULL;
550
551         h->in_replay = true;
552         talloc_free(h->m_write);
553         h->m_write = NULL;
554
555         ret = db_ctdb_transaction_fetch_start(h);
556         if (ret != 0) {
557                 return ret;
558         }
559
560         for (i=0;i<h->m_all->count;i++) {
561                 TDB_DATA key, data;
562
563                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
564                 if (rec == NULL) {
565                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
566                         goto failed;
567                 }
568
569                 if (rec->reqid == 0) {
570                         /* its a store */
571                         if (db_ctdb_transaction_store(h, key, data) != 0) {
572                                 goto failed;
573                         }
574                 } else {
575                         TDB_DATA data2;
576                         TALLOC_CTX *tmp_ctx = talloc_new(h);
577
578                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
579                                 talloc_free(tmp_ctx);
580                                 goto failed;
581                         }
582                         if (data2.dsize != data.dsize ||
583                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
584                                 /* the record has changed on us - we have to give up */
585                                 talloc_free(tmp_ctx);
586                                 goto failed;
587                         }
588                         talloc_free(tmp_ctx);
589                 }
590         }
591         
592         return 0;
593
594 failed:
595         tdb_transaction_cancel(h->ctx->wtdb->tdb);
596         return -1;
597 }
598
599
600 /*
601   commit a transaction
602  */
603 static int db_ctdb_transaction_commit(struct db_context *db)
604 {
605         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
606                                                         struct db_ctdb_ctx);
607         NTSTATUS rets;
608         int ret;
609         int status;
610         int retries = 0;
611         struct db_ctdb_transaction_handle *h = ctx->transaction;
612         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
613
614         if (h == NULL) {
615                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
616                 return -1;
617         }
618
619         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
620
621         talloc_set_destructor(h, NULL);
622
623         /* our commit strategy is quite complex.
624
625            - we first try to commit the changes to all other nodes
626
627            - if that works, then we commit locally and we are done
628
629            - if a commit on another node fails, then we need to cancel
630              the transaction, then restart the transaction (thus
631              opening a window of time for a pending recovery to
632              complete), then replay the transaction, checking all the
633              reads and writes (checking that reads give the same data,
634              and writes succeed). Then we retry the transaction to the
635              other nodes
636         */
637
638 again:
639         if (h->m_write == NULL) {
640                 /* no changes were made, potentially after a retry */
641                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
642                 talloc_free(h);
643                 ctx->transaction = NULL;
644                 return 0;
645         }
646
647         /* tell ctdbd to commit to the other nodes */
648         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
649                                    retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
650                                    h->ctx->db_id, 0,
651                                    db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
652         if (!NT_STATUS_IS_OK(rets) || status != 0) {
653                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
654                 sleep(1);
655
656                 if (!NT_STATUS_IS_OK(rets)) {
657                         failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
658                 } else {
659                         /* work out what error code we will give if we 
660                            have to fail the operation */
661                         switch ((enum ctdb_trans2_commit_error)status) {
662                         case CTDB_TRANS2_COMMIT_SUCCESS:
663                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
664                         case CTDB_TRANS2_COMMIT_TIMEOUT:
665                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
666                                 break;
667                         case CTDB_TRANS2_COMMIT_ALLFAIL:
668                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
669                                 break;
670                         }
671                 }
672
673                 if (++retries == 10) {
674                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
675                                  h->ctx->db_id, retries, (unsigned)failure_control));
676                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
677                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
678                                             tdb_null, NULL, NULL, NULL);
679                         h->ctx->transaction = NULL;
680                         talloc_free(h);
681                         ctx->transaction = NULL;
682                         return -1;                      
683                 }
684
685                 if (ctdb_replay_transaction(h) != 0) {
686                         DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
687                                  (unsigned)failure_control));
688                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
689                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
690                                             tdb_null, NULL, NULL, NULL);
691                         h->ctx->transaction = NULL;
692                         talloc_free(h);
693                         ctx->transaction = NULL;
694                         return -1;
695                 }
696                 goto again;
697         } else {
698                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
699         }
700
701         /* do the real commit locally */
702         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
703         if (ret != 0) {
704                 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
705                          (unsigned)failure_control));
706                 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
707                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
708                 h->ctx->transaction = NULL;
709                 talloc_free(h);
710                 return ret;
711         }
712
713         /* tell ctdbd that we are finished with our local commit */
714         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
715                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
716                             tdb_null, NULL, NULL, NULL);
717         h->ctx->transaction = NULL;
718         talloc_free(h);
719         return 0;
720 }
721
722
723 /*
724   cancel a transaction
725  */
726 static int db_ctdb_transaction_cancel(struct db_context *db)
727 {
728         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
729                                                         struct db_ctdb_ctx);
730         struct db_ctdb_transaction_handle *h = ctx->transaction;
731
732         if (h == NULL) {
733                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
734                 return -1;
735         }
736
737         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
738
739         ctx->transaction = NULL;
740         talloc_free(h);
741         return 0;
742 }
743
744
745 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
746 {
747         struct db_ctdb_rec *crec = talloc_get_type_abort(
748                 rec->private_data, struct db_ctdb_rec);
749         TDB_DATA cdata;
750         int ret;
751
752         cdata.dsize = sizeof(crec->header) + data.dsize;
753
754         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
755                 return NT_STATUS_NO_MEMORY;
756         }
757
758         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
759         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
760
761         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
762
763         SAFE_FREE(cdata.dptr);
764
765         return (ret == 0) ? NT_STATUS_OK
766                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
767 }
768
769
770
771 static NTSTATUS db_ctdb_delete(struct db_record *rec)
772 {
773         TDB_DATA data;
774
775         /*
776          * We have to store the header with empty data. TODO: Fix the
777          * tdb-level cleanup
778          */
779
780         ZERO_STRUCT(data);
781
782         return db_ctdb_store(rec, data, 0);
783
784 }
785
786 static int db_ctdb_record_destr(struct db_record* data)
787 {
788         struct db_ctdb_rec *crec = talloc_get_type_abort(
789                 data->private_data, struct db_ctdb_rec);
790
791         DEBUG(10, (DEBUGLEVEL > 10
792                    ? "Unlocking db %u key %s\n"
793                    : "Unlocking db %u key %.20s\n",
794                    (int)crec->ctdb_ctx->db_id,
795                    hex_encode(data, (unsigned char *)data->key.dptr,
796                               data->key.dsize)));
797
798         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
799                 DEBUG(0, ("tdb_chainunlock failed\n"));
800                 return -1;
801         }
802
803         return 0;
804 }
805
806 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
807                                                TALLOC_CTX *mem_ctx,
808                                                TDB_DATA key,
809                                                bool persistent)
810 {
811         struct db_record *result;
812         struct db_ctdb_rec *crec;
813         NTSTATUS status;
814         TDB_DATA ctdb_data;
815         int migrate_attempts = 0;
816
817         if (!(result = talloc(mem_ctx, struct db_record))) {
818                 DEBUG(0, ("talloc failed\n"));
819                 return NULL;
820         }
821
822         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
823                 DEBUG(0, ("talloc failed\n"));
824                 TALLOC_FREE(result);
825                 return NULL;
826         }
827
828         result->private_data = (void *)crec;
829         crec->ctdb_ctx = ctx;
830
831         result->key.dsize = key.dsize;
832         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
833         if (result->key.dptr == NULL) {
834                 DEBUG(0, ("talloc failed\n"));
835                 TALLOC_FREE(result);
836                 return NULL;
837         }
838
839         /*
840          * Do a blocking lock on the record
841          */
842 again:
843
844         if (DEBUGLEVEL >= 10) {
845                 char *keystr = hex_encode(result, key.dptr, key.dsize);
846                 DEBUG(10, (DEBUGLEVEL > 10
847                            ? "Locking db %u key %s\n"
848                            : "Locking db %u key %.20s\n",
849                            (int)crec->ctdb_ctx->db_id, keystr));
850                 TALLOC_FREE(keystr);
851         }
852         
853         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
854                 DEBUG(3, ("tdb_chainlock failed\n"));
855                 TALLOC_FREE(result);
856                 return NULL;
857         }
858
859         result->store = db_ctdb_store;
860         result->delete_rec = db_ctdb_delete;
861         talloc_set_destructor(result, db_ctdb_record_destr);
862
863         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
864
865         /*
866          * See if we have a valid record and we are the dmaster. If so, we can
867          * take the shortcut and just return it.
868          */
869
870         if ((ctdb_data.dptr == NULL) ||
871             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
872             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
873 #if 0
874             || (random() % 2 != 0)
875 #endif
876 ) {
877                 SAFE_FREE(ctdb_data.dptr);
878                 tdb_chainunlock(ctx->wtdb->tdb, key);
879                 talloc_set_destructor(result, NULL);
880
881                 migrate_attempts += 1;
882
883                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
884                            ctdb_data.dptr, ctdb_data.dptr ?
885                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
886                            get_my_vnn()));
887
888                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
889                 if (!NT_STATUS_IS_OK(status)) {
890                         DEBUG(5, ("ctdb_migrate failed: %s\n",
891                                   nt_errstr(status)));
892                         TALLOC_FREE(result);
893                         return NULL;
894                 }
895                 /* now its migrated, try again */
896                 goto again;
897         }
898
899         if (migrate_attempts > 10) {
900                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
901                           migrate_attempts));
902         }
903
904         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
905
906         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
907         result->value.dptr = NULL;
908
909         if ((result->value.dsize != 0)
910             && !(result->value.dptr = (uint8 *)talloc_memdup(
911                          result, ctdb_data.dptr + sizeof(crec->header),
912                          result->value.dsize))) {
913                 DEBUG(0, ("talloc failed\n"));
914                 TALLOC_FREE(result);
915         }
916
917         SAFE_FREE(ctdb_data.dptr);
918
919         return result;
920 }
921
922 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
923                                               TALLOC_CTX *mem_ctx,
924                                               TDB_DATA key)
925 {
926         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
927                                                         struct db_ctdb_ctx);
928
929         if (ctx->transaction != NULL) {
930                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
931         }
932
933         if (db->persistent) {
934                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
935         }
936
937         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
938 }
939
940 /*
941   fetch (unlocked, no migration) operation on ctdb
942  */
943 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
944                          TDB_DATA key, TDB_DATA *data)
945 {
946         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
947                                                         struct db_ctdb_ctx);
948         NTSTATUS status;
949         TDB_DATA ctdb_data;
950
951         if (ctx->transaction) {
952                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
953         }
954
955         /* try a direct fetch */
956         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
957
958         /*
959          * See if we have a valid record and we are the dmaster. If so, we can
960          * take the shortcut and just return it.
961          * we bypass the dmaster check for persistent databases
962          */
963         if ((ctdb_data.dptr != NULL) &&
964             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
965             (db->persistent ||
966              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
967                 /* we are the dmaster - avoid the ctdb protocol op */
968
969                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
970                 if (data->dsize == 0) {
971                         SAFE_FREE(ctdb_data.dptr);
972                         data->dptr = NULL;
973                         return 0;
974                 }
975
976                 data->dptr = (uint8 *)talloc_memdup(
977                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
978                         data->dsize);
979
980                 SAFE_FREE(ctdb_data.dptr);
981
982                 if (data->dptr == NULL) {
983                         return -1;
984                 }
985                 return 0;
986         }
987
988         SAFE_FREE(ctdb_data.dptr);
989
990         /* we weren't able to get it locally - ask ctdb to fetch it for us */
991         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
992         if (!NT_STATUS_IS_OK(status)) {
993                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
994                 return -1;
995         }
996
997         return 0;
998 }
999
1000 struct traverse_state {
1001         struct db_context *db;
1002         int (*fn)(struct db_record *rec, void *private_data);
1003         void *private_data;
1004 };
1005
1006 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1007 {
1008         struct traverse_state *state = (struct traverse_state *)private_data;
1009         struct db_record *rec;
1010         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1011         /* we have to give them a locked record to prevent races */
1012         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1013         if (rec && rec->value.dsize > 0) {
1014                 state->fn(rec, state->private_data);
1015         }
1016         talloc_free(tmp_ctx);
1017 }
1018
1019 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1020                                         void *private_data)
1021 {
1022         struct traverse_state *state = (struct traverse_state *)private_data;
1023         struct db_record *rec;
1024         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1025         int ret = 0;
1026         /* we have to give them a locked record to prevent races */
1027         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1028         if (rec && rec->value.dsize > 0) {
1029                 ret = state->fn(rec, state->private_data);
1030         }
1031         talloc_free(tmp_ctx);
1032         return ret;
1033 }
1034
1035 static int db_ctdb_traverse(struct db_context *db,
1036                             int (*fn)(struct db_record *rec,
1037                                       void *private_data),
1038                             void *private_data)
1039 {
1040         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1041                                                         struct db_ctdb_ctx);
1042         struct traverse_state state;
1043
1044         state.db = db;
1045         state.fn = fn;
1046         state.private_data = private_data;
1047
1048         if (db->persistent) {
1049                 /* for persistent databases we don't need to do a ctdb traverse,
1050                    we can do a faster local traverse */
1051                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1052         }
1053
1054
1055         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1056         return 0;
1057 }
1058
1059 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1060 {
1061         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1062 }
1063
1064 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1065 {
1066         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1067 }
1068
1069 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1070 {
1071         struct traverse_state *state = (struct traverse_state *)private_data;
1072         struct db_record rec;
1073         rec.key = key;
1074         rec.value = data;
1075         rec.store = db_ctdb_store_deny;
1076         rec.delete_rec = db_ctdb_delete_deny;
1077         rec.private_data = state->db;
1078         state->fn(&rec, state->private_data);
1079 }
1080
1081 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1082                                         void *private_data)
1083 {
1084         struct traverse_state *state = (struct traverse_state *)private_data;
1085         struct db_record rec;
1086         rec.key = kbuf;
1087         rec.value = dbuf;
1088         rec.store = db_ctdb_store_deny;
1089         rec.delete_rec = db_ctdb_delete_deny;
1090         rec.private_data = state->db;
1091
1092         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1093                 /* a deleted record */
1094                 return 0;
1095         }
1096         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1097         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1098
1099         return state->fn(&rec, state->private_data);
1100 }
1101
1102 static int db_ctdb_traverse_read(struct db_context *db,
1103                                  int (*fn)(struct db_record *rec,
1104                                            void *private_data),
1105                                  void *private_data)
1106 {
1107         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1108                                                         struct db_ctdb_ctx);
1109         struct traverse_state state;
1110
1111         state.db = db;
1112         state.fn = fn;
1113         state.private_data = private_data;
1114
1115         if (db->persistent) {
1116                 /* for persistent databases we don't need to do a ctdb traverse,
1117                    we can do a faster local traverse */
1118                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1119         }
1120
1121         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1122         return 0;
1123 }
1124
1125 static int db_ctdb_get_seqnum(struct db_context *db)
1126 {
1127         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1128                                                         struct db_ctdb_ctx);
1129         return tdb_get_seqnum(ctx->wtdb->tdb);
1130 }
1131
1132 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1133                                 const char *name,
1134                                 int hash_size, int tdb_flags,
1135                                 int open_flags, mode_t mode)
1136 {
1137         struct db_context *result;
1138         struct db_ctdb_ctx *db_ctdb;
1139         char *db_path;
1140
1141         if (!lp_clustering()) {
1142                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1143                 return NULL;
1144         }
1145
1146         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1147                 DEBUG(0, ("talloc failed\n"));
1148                 TALLOC_FREE(result);
1149                 return NULL;
1150         }
1151
1152         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1153                 DEBUG(0, ("talloc failed\n"));
1154                 TALLOC_FREE(result);
1155                 return NULL;
1156         }
1157
1158         db_ctdb->transaction = NULL;
1159         db_ctdb->db = result;
1160
1161         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1162                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1163                 TALLOC_FREE(result);
1164                 return NULL;
1165         }
1166
1167         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1168
1169         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1170
1171         /* only pass through specific flags */
1172         tdb_flags &= TDB_SEQNUM;
1173
1174         /* honor permissions if user has specified O_CREAT */
1175         if (open_flags & O_CREAT) {
1176                 chmod(db_path, mode);
1177         }
1178
1179         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1180         if (db_ctdb->wtdb == NULL) {
1181                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1182                 TALLOC_FREE(result);
1183                 return NULL;
1184         }
1185         talloc_free(db_path);
1186
1187         result->private_data = (void *)db_ctdb;
1188         result->fetch_locked = db_ctdb_fetch_locked;
1189         result->fetch = db_ctdb_fetch;
1190         result->traverse = db_ctdb_traverse;
1191         result->traverse_read = db_ctdb_traverse_read;
1192         result->get_seqnum = db_ctdb_get_seqnum;
1193         result->transaction_start = db_ctdb_transaction_start;
1194         result->transaction_commit = db_ctdb_transaction_commit;
1195         result->transaction_cancel = db_ctdb_transaction_cancel;
1196
1197         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1198                  name, db_ctdb->db_id));
1199
1200         return result;
1201 }
1202 #endif