handle two special cases
[samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34 };
35
36 struct db_ctdb_ctx {
37         struct db_context *db;
38         struct tdb_wrap *wtdb;
39         uint32 db_id;
40         struct db_ctdb_transaction_handle *transaction;
41 };
42
43 struct db_ctdb_rec {
44         struct db_ctdb_ctx *ctdb_ctx;
45         struct ctdb_ltdb_header header;
46 };
47
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
49                                                TALLOC_CTX *mem_ctx,
50                                                TDB_DATA key,
51                                                bool persistent);
52
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
54 {
55         NTSTATUS status;
56         enum TDB_ERROR tret = tdb_error(tdb);
57
58         switch (tret) {
59         case TDB_ERR_EXISTS:
60                 status = NT_STATUS_OBJECT_NAME_COLLISION;
61                 break;
62         case TDB_ERR_NOEXIST:
63                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
64                 break;
65         default:
66                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
67                 break;
68         }
69
70         return status;
71 }
72
73
74
75 /*
76   form a ctdb_rec_data record from a key/data pair
77   
78   note that header may be NULL. If not NULL then it is included in the data portion
79   of the record
80  */
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
82                                                   TDB_DATA key, 
83                                                   struct ctdb_ltdb_header *header,
84                                                   TDB_DATA data)
85 {
86         size_t length;
87         struct ctdb_rec_data *d;
88
89         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
90                 data.dsize + (header?sizeof(*header):0);
91         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
92         if (d == NULL) {
93                 return NULL;
94         }
95         d->length = length;
96         d->reqid = reqid;
97         d->keylen = key.dsize;
98         memcpy(&d->data[0], key.dptr, key.dsize);
99         if (header) {
100                 d->datalen = data.dsize + sizeof(*header);
101                 memcpy(&d->data[key.dsize], header, sizeof(*header));
102                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
103         } else {
104                 d->datalen = data.dsize;
105                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
106         }
107         return d;
108 }
109
110
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
113                                                struct ctdb_marshall_buffer *m,
114                                                uint64_t db_id,
115                                                uint32_t reqid,
116                                                TDB_DATA key,
117                                                struct ctdb_ltdb_header *header,
118                                                TDB_DATA data)
119 {
120         struct ctdb_rec_data *r;
121         size_t m_size, r_size;
122         struct ctdb_marshall_buffer *m2;
123
124         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
125         if (r == NULL) {
126                 talloc_free(m);
127                 return NULL;
128         }
129
130         if (m == NULL) {
131                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
132                 if (m == NULL) {
133                         return NULL;
134                 }
135                 m->db_id = db_id;
136         }
137
138         m_size = talloc_get_size(m);
139         r_size = talloc_get_size(r);
140
141         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
142         if (m2 == NULL) {
143                 talloc_free(m);
144                 return NULL;
145         }
146
147         memcpy(m_size + (uint8_t *)m2, r, r_size);
148
149         talloc_free(r);
150
151         m2->count++;
152
153         return m2;
154 }
155
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
158 {
159         TDB_DATA data;
160         data.dptr = (uint8_t *)m;
161         data.dsize = talloc_get_size(m);
162         return data;
163 }
164
165 /* 
166    loop over a marshalling buffer 
167    
168      - pass r==NULL to start
169      - loop the number of times indicated by m->count
170 */
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
172                                                      uint32_t *reqid,
173                                                      struct ctdb_ltdb_header *header,
174                                                      TDB_DATA *key, TDB_DATA *data)
175 {
176         if (r == NULL) {
177                 r = (struct ctdb_rec_data *)&m->data[0];
178         } else {
179                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
180         }
181
182         if (reqid != NULL) {
183                 *reqid = r->reqid;
184         }
185         
186         if (key != NULL) {
187                 key->dptr   = &r->data[0];
188                 key->dsize  = r->keylen;
189         }
190         if (data != NULL) {
191                 data->dptr  = &r->data[r->keylen];
192                 data->dsize = r->datalen;
193                 if (header != NULL) {
194                         data->dptr += sizeof(*header);
195                         data->dsize -= sizeof(*header);
196                 }
197         }
198
199         if (header != NULL) {
200                 if (r->datalen < sizeof(*header)) {
201                         return NULL;
202                 }
203                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
204         }
205
206         return r;
207 }
208
209
210
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
213 {
214         tdb_transaction_cancel(h->ctx->wtdb->tdb);
215         return 0;
216 }
217
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
220 {
221         struct db_record *rh;
222         TDB_DATA key;
223         TALLOC_CTX *tmp_ctx;
224         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
225         int ret;
226         struct db_ctdb_ctx *ctx = h->ctx;
227         TDB_DATA data;
228
229         key.dptr = discard_const(keyname);
230         key.dsize = strlen(keyname);
231
232 again:
233         tmp_ctx = talloc_new(h);
234
235         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
236         if (rh == NULL) {
237                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
238                 talloc_free(tmp_ctx);
239                 return -1;
240         }
241         talloc_free(rh);
242
243         ret = tdb_transaction_start(ctx->wtdb->tdb);
244         if (ret != 0) {
245                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246                 talloc_free(tmp_ctx);
247                 return -1;
248         }
249
250         data = tdb_fetch(ctx->wtdb->tdb, key);
251         if ((data.dptr == NULL) ||
252             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254                 SAFE_FREE(data.dptr);
255                 tdb_transaction_cancel(ctx->wtdb->tdb);
256                 talloc_free(tmp_ctx);
257                 goto again;
258         }
259
260         SAFE_FREE(data.dptr);
261         talloc_free(tmp_ctx);
262
263         return 0;
264 }
265
266
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
269 {
270         struct db_ctdb_transaction_handle *h;
271         int ret;
272         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
273                                                         struct db_ctdb_ctx);
274
275         if (!db->persistent) {
276                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
277                          ctx->db_id));
278                 return -1;
279         }
280
281         if (ctx->transaction) {
282                 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
283                 return -1;
284         }
285
286         h = talloc_zero(db, struct db_ctdb_transaction_handle);
287         if (h == NULL) {
288                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
289                 return -1;
290         }
291
292         h->ctx = ctx;
293
294         ret = db_ctdb_transaction_fetch_start(h);
295         if (ret != 0) {
296                 talloc_free(h);
297                 return -1;
298         }
299
300         talloc_set_destructor(h, db_ctdb_transaction_destructor);
301
302         ctx->transaction = h;
303
304         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
305
306         return 0;
307 }
308
309
310
311 /*
312   fetch a record inside a transaction
313  */
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
315                                      TALLOC_CTX *mem_ctx, 
316                                      TDB_DATA key, TDB_DATA *data)
317 {
318         struct db_ctdb_transaction_handle *h = db->transaction;
319
320         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
321
322         if (data->dptr != NULL) {
323                 uint8_t *oldptr = (uint8_t *)data->dptr;
324                 data->dsize -= sizeof(struct ctdb_ltdb_header);
325                 if (data->dsize == 0) {
326                         data->dptr = NULL;
327                 } else {
328                         data->dptr = (uint8 *)
329                                 talloc_memdup(
330                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
331                                         data->dsize);
332                 }
333                 SAFE_FREE(oldptr);
334                 if (data->dptr == NULL && data->dsize != 0) {
335                         return -1;
336                 }
337         }
338
339         if (!h->in_replay) {
340                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341                 if (h->m_all == NULL) {
342                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
343                         data->dsize = 0;
344                         talloc_free(data->dptr);
345                         return -1;
346                 }
347         }
348
349         return 0;
350 }
351
352
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
355
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
357                                                           TALLOC_CTX *mem_ctx,
358                                                           TDB_DATA key)
359 {
360         struct db_record *result;
361         TDB_DATA ctdb_data;
362
363         if (!(result = talloc(mem_ctx, struct db_record))) {
364                 DEBUG(0, ("talloc failed\n"));
365                 return NULL;
366         }
367
368         result->private_data = ctx->transaction;
369
370         result->key.dsize = key.dsize;
371         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372         if (result->key.dptr == NULL) {
373                 DEBUG(0, ("talloc failed\n"));
374                 TALLOC_FREE(result);
375                 return NULL;
376         }
377
378         result->store = db_ctdb_store_transaction;
379         result->delete_rec = db_ctdb_delete_transaction;
380
381         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382         if (ctdb_data.dptr == NULL) {
383                 /* create the record */
384                 result->value = tdb_null;
385                 return result;
386         }
387
388         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389         result->value.dptr = NULL;
390
391         if ((result->value.dsize != 0)
392             && !(result->value.dptr = (uint8 *)talloc_memdup(
393                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394                          result->value.dsize))) {
395                 DEBUG(0, ("talloc failed\n"));
396                 TALLOC_FREE(result);
397         }
398
399         SAFE_FREE(ctdb_data.dptr);
400
401         return result;
402 }
403
404 static int db_ctdb_record_destructor(struct db_record *rec)
405 {
406         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407                 rec->private_data, struct db_ctdb_transaction_handle);
408         int ret = h->ctx->db->transaction_commit(h->ctx->db);
409         if (ret != 0) {
410                 DEBUG(0,(__location__ " transaction_commit failed\n"));
411         }
412         return 0;
413 }
414
415 /*
416   auto-create a transaction for persistent databases
417  */
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
419                                                          TALLOC_CTX *mem_ctx,
420                                                          TDB_DATA key)
421 {
422         int res;
423         struct db_record *rec;
424
425         res = db_ctdb_transaction_start(ctx->db);
426         if (res == -1) {
427                 return NULL;
428         }
429
430         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
431         if (rec == NULL) {
432                 ctx->db->transaction_cancel(ctx->db);           
433                 return NULL;
434         }
435
436         /* destroy this transaction when we release the lock */
437         talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
438         return rec;
439 }
440
441
442 /*
443   stores a record inside a transaction
444  */
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
446                                      TDB_DATA key, TDB_DATA data)
447 {
448         TALLOC_CTX *tmp_ctx = talloc_new(h);
449         int ret;
450         TDB_DATA rec;
451         struct ctdb_ltdb_header header;
452
453         /* we need the header so we can update the RSN */
454         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455         if (rec.dptr == NULL) {
456                 /* the record doesn't exist - create one with us as dmaster.
457                    This is only safe because we are in a transaction and this
458                    is a persistent database */
459                 ZERO_STRUCT(header);
460                 header.dmaster = get_my_vnn();
461         } else {
462                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
463                 rec.dsize -= sizeof(struct ctdb_ltdb_header);
464                 /* a special case, we are writing the same data that is there now */
465                 if (data.dsize == rec.dsize &&
466                     memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
467                         SAFE_FREE(rec.dptr);
468                         talloc_free(tmp_ctx);
469                         return 0;
470                 }
471                 SAFE_FREE(rec.dptr);
472         }
473
474         header.rsn++;
475
476         if (!h->in_replay) {
477                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
478                 if (h->m_all == NULL) {
479                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
480                         talloc_free(tmp_ctx);
481                         return -1;
482                 }
483         }
484                 
485         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
486         if (h->m_write == NULL) {
487                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
488                 talloc_free(tmp_ctx);
489                 return -1;
490         }
491         
492         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
493         rec.dptr = talloc_size(tmp_ctx, rec.dsize);
494         if (rec.dptr == NULL) {
495                 DEBUG(0,(__location__ " Failed to alloc record\n"));
496                 talloc_free(tmp_ctx);
497                 return -1;
498         }
499         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
500         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
501
502         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
503
504         talloc_free(tmp_ctx);
505         
506         return ret;
507 }
508
509
510 /* 
511    a record store inside a transaction
512  */
513 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
514 {
515         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
516                 rec->private_data, struct db_ctdb_transaction_handle);
517         int ret;
518
519         ret = db_ctdb_transaction_store(h, rec->key, data);
520         if (ret != 0) {
521                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
522         }
523         return NT_STATUS_OK;
524 }
525
526 /* 
527    a record delete inside a transaction
528  */
529 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
530 {
531         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
532                 rec->private_data, struct db_ctdb_transaction_handle);
533         int ret;
534
535         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
536         if (ret != 0) {
537                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
538         }
539         return NT_STATUS_OK;
540 }
541
542
543 /*
544   replay a transaction
545  */
546 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
547 {
548         int ret, i;
549         struct ctdb_rec_data *rec = NULL;
550
551         h->in_replay = true;
552         talloc_free(h->m_write);
553         h->m_write = NULL;
554
555         ret = db_ctdb_transaction_fetch_start(h);
556         if (ret != 0) {
557                 return ret;
558         }
559
560         for (i=0;i<h->m_all->count;i++) {
561                 TDB_DATA key, data;
562
563                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
564                 if (rec == NULL) {
565                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
566                         goto failed;
567                 }
568
569                 if (rec->reqid == 0) {
570                         /* its a store */
571                         if (db_ctdb_transaction_store(h, key, data) != 0) {
572                                 goto failed;
573                         }
574                 } else {
575                         TDB_DATA data2;
576                         TALLOC_CTX *tmp_ctx = talloc_new(h);
577
578                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
579                                 talloc_free(tmp_ctx);
580                                 goto failed;
581                         }
582                         if (data2.dsize != data.dsize ||
583                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
584                                 /* the record has changed on us - we have to give up */
585                                 talloc_free(tmp_ctx);
586                                 goto failed;
587                         }
588                         talloc_free(tmp_ctx);
589                 }
590         }
591         
592         return 0;
593
594 failed:
595         tdb_transaction_cancel(h->ctx->wtdb->tdb);
596         return -1;
597 }
598
599
600 /*
601   commit a transaction
602  */
603 static int db_ctdb_transaction_commit(struct db_context *db)
604 {
605         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
606                                                         struct db_ctdb_ctx);
607         NTSTATUS rets;
608         int ret;
609         int status;
610         int retries = 0;
611         struct db_ctdb_transaction_handle *h = ctx->transaction;
612         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
613
614         if (h == NULL) {
615                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
616                 return -1;
617         }
618
619         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
620
621         talloc_set_destructor(h, NULL);
622
623         /* our commit strategy is quite complex.
624
625            - we first try to commit the changes to all other nodes
626
627            - if that works, then we commit locally and we are done
628
629            - if a commit on another node fails, then we need to cancel
630              the transaction, then restart the transaction (thus
631              opening a window of time for a pending recovery to
632              complete), then replay the transaction, checking all the
633              reads and writes (checking that reads give the same data,
634              and writes succeed). Then we retry the transaction to the
635              other nodes
636         */
637
638 again:
639         if (h->m_write == NULL) {
640                 /* no changes were made, potentially after a retry */
641                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
642                 talloc_free(h);
643                 ctx->transaction = NULL;
644                 return 0;
645         }
646
647         /* tell ctdbd to commit to the other nodes */
648         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
649                                   CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
650                                   db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
651         if (!NT_STATUS_IS_OK(rets) || status != 0) {
652                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
653                 sleep(1);
654
655                 /* work out what error code we will give if we 
656                    have to fail the operation */
657                 switch ((enum ctdb_trans2_commit_error)status) {
658                 case CTDB_TRANS2_COMMIT_SUCCESS:
659                 case CTDB_TRANS2_COMMIT_SOMEFAIL:
660                 case CTDB_TRANS2_COMMIT_TIMEOUT:
661                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
662                         break;
663                 case CTDB_TRANS2_COMMIT_ALLFAIL:
664                         failure_control = CTDB_CONTROL_TRANS2_FINISHED;
665                         break;
666                 }
667
668                 if (ctdb_replay_transaction(h) != 0) {
669                         DEBUG(0,(__location__ " Failed to replay transaction\n"));
670                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
671                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
672                                             tdb_null, NULL, NULL, NULL);
673                         h->ctx->transaction = NULL;
674                         talloc_free(h);
675                         ctx->transaction = NULL;
676                         return -1;
677                 }
678                 if (++retries == 10) {
679                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries\n", 
680                                  h->ctx->db_id, retries));
681                         ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
682                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
683                                             tdb_null, NULL, NULL, NULL);
684                         h->ctx->transaction = NULL;
685                         talloc_free(h);
686                         ctx->transaction = NULL;
687                         return -1;                      
688                 }
689                 goto again;
690         }
691
692         /* do the real commit locally */
693         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
694         if (ret != 0) {
695                 DEBUG(0,(__location__ " Failed to commit transaction\n"));
696                 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id, 
697                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
698                 h->ctx->transaction = NULL;
699                 talloc_free(h);
700                 return ret;
701         }
702
703         /* tell ctdbd that we are finished with our local commit */
704         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
705                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
706                             tdb_null, NULL, NULL, NULL);
707         h->ctx->transaction = NULL;
708         talloc_free(h);
709         return 0;
710 }
711
712
713 /*
714   cancel a transaction
715  */
716 static int db_ctdb_transaction_cancel(struct db_context *db)
717 {
718         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
719                                                         struct db_ctdb_ctx);
720         struct db_ctdb_transaction_handle *h = ctx->transaction;
721
722         if (h == NULL) {
723                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
724                 return -1;
725         }
726
727         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
728
729         ctx->transaction = NULL;
730         talloc_free(h);
731         return 0;
732 }
733
734
735 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
736 {
737         struct db_ctdb_rec *crec = talloc_get_type_abort(
738                 rec->private_data, struct db_ctdb_rec);
739         TDB_DATA cdata;
740         int ret;
741
742         cdata.dsize = sizeof(crec->header) + data.dsize;
743
744         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
745                 return NT_STATUS_NO_MEMORY;
746         }
747
748         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
749         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
750
751         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
752
753         SAFE_FREE(cdata.dptr);
754
755         return (ret == 0) ? NT_STATUS_OK
756                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
757 }
758
759
760
761 static NTSTATUS db_ctdb_delete(struct db_record *rec)
762 {
763         TDB_DATA data;
764
765         /*
766          * We have to store the header with empty data. TODO: Fix the
767          * tdb-level cleanup
768          */
769
770         ZERO_STRUCT(data);
771
772         return db_ctdb_store(rec, data, 0);
773
774 }
775
776 static int db_ctdb_record_destr(struct db_record* data)
777 {
778         struct db_ctdb_rec *crec = talloc_get_type_abort(
779                 data->private_data, struct db_ctdb_rec);
780
781         DEBUG(10, (DEBUGLEVEL > 10
782                    ? "Unlocking db %u key %s\n"
783                    : "Unlocking db %u key %.20s\n",
784                    (int)crec->ctdb_ctx->db_id,
785                    hex_encode(data, (unsigned char *)data->key.dptr,
786                               data->key.dsize)));
787
788         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
789                 DEBUG(0, ("tdb_chainunlock failed\n"));
790                 return -1;
791         }
792
793         return 0;
794 }
795
796 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
797                                                TALLOC_CTX *mem_ctx,
798                                                TDB_DATA key,
799                                                bool persistent)
800 {
801         struct db_record *result;
802         struct db_ctdb_rec *crec;
803         NTSTATUS status;
804         TDB_DATA ctdb_data;
805         int migrate_attempts = 0;
806
807         if (!(result = talloc(mem_ctx, struct db_record))) {
808                 DEBUG(0, ("talloc failed\n"));
809                 return NULL;
810         }
811
812         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
813                 DEBUG(0, ("talloc failed\n"));
814                 TALLOC_FREE(result);
815                 return NULL;
816         }
817
818         result->private_data = (void *)crec;
819         crec->ctdb_ctx = ctx;
820
821         result->key.dsize = key.dsize;
822         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
823         if (result->key.dptr == NULL) {
824                 DEBUG(0, ("talloc failed\n"));
825                 TALLOC_FREE(result);
826                 return NULL;
827         }
828
829         /*
830          * Do a blocking lock on the record
831          */
832 again:
833
834         if (DEBUGLEVEL >= 10) {
835                 char *keystr = hex_encode(result, key.dptr, key.dsize);
836                 DEBUG(10, (DEBUGLEVEL > 10
837                            ? "Locking db %u key %s\n"
838                            : "Locking db %u key %.20s\n",
839                            (int)crec->ctdb_ctx->db_id, keystr));
840                 TALLOC_FREE(keystr);
841         }
842         
843         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
844                 DEBUG(3, ("tdb_chainlock failed\n"));
845                 TALLOC_FREE(result);
846                 return NULL;
847         }
848
849         result->store = db_ctdb_store;
850         result->delete_rec = db_ctdb_delete;
851         talloc_set_destructor(result, db_ctdb_record_destr);
852
853         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
854
855         /*
856          * See if we have a valid record and we are the dmaster. If so, we can
857          * take the shortcut and just return it.
858          */
859
860         if ((ctdb_data.dptr == NULL) ||
861             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
862             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
863 #if 0
864             || (random() % 2 != 0)
865 #endif
866 ) {
867                 SAFE_FREE(ctdb_data.dptr);
868                 tdb_chainunlock(ctx->wtdb->tdb, key);
869                 talloc_set_destructor(result, NULL);
870
871                 migrate_attempts += 1;
872
873                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
874                            ctdb_data.dptr, ctdb_data.dptr ?
875                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
876                            get_my_vnn()));
877
878                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
879                 if (!NT_STATUS_IS_OK(status)) {
880                         DEBUG(5, ("ctdb_migrate failed: %s\n",
881                                   nt_errstr(status)));
882                         TALLOC_FREE(result);
883                         return NULL;
884                 }
885                 /* now its migrated, try again */
886                 goto again;
887         }
888
889         if (migrate_attempts > 10) {
890                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
891                           migrate_attempts));
892         }
893
894         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
895
896         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
897         result->value.dptr = NULL;
898
899         if ((result->value.dsize != 0)
900             && !(result->value.dptr = (uint8 *)talloc_memdup(
901                          result, ctdb_data.dptr + sizeof(crec->header),
902                          result->value.dsize))) {
903                 DEBUG(0, ("talloc failed\n"));
904                 TALLOC_FREE(result);
905         }
906
907         SAFE_FREE(ctdb_data.dptr);
908
909         return result;
910 }
911
912 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
913                                               TALLOC_CTX *mem_ctx,
914                                               TDB_DATA key)
915 {
916         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
917                                                         struct db_ctdb_ctx);
918
919         if (ctx->transaction != NULL) {
920                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
921         }
922
923         if (db->persistent) {
924                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
925         }
926
927         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
928 }
929
930 /*
931   fetch (unlocked, no migration) operation on ctdb
932  */
933 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
934                          TDB_DATA key, TDB_DATA *data)
935 {
936         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
937                                                         struct db_ctdb_ctx);
938         NTSTATUS status;
939         TDB_DATA ctdb_data;
940
941         if (ctx->transaction) {
942                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
943         }
944
945         /* try a direct fetch */
946         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
947
948         /*
949          * See if we have a valid record and we are the dmaster. If so, we can
950          * take the shortcut and just return it.
951          * we bypass the dmaster check for persistent databases
952          */
953         if ((ctdb_data.dptr != NULL) &&
954             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
955             (db->persistent ||
956              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
957                 /* we are the dmaster - avoid the ctdb protocol op */
958
959                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
960                 if (data->dsize == 0) {
961                         SAFE_FREE(ctdb_data.dptr);
962                         data->dptr = NULL;
963                         return 0;
964                 }
965
966                 data->dptr = (uint8 *)talloc_memdup(
967                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
968                         data->dsize);
969
970                 SAFE_FREE(ctdb_data.dptr);
971
972                 if (data->dptr == NULL) {
973                         return -1;
974                 }
975                 return 0;
976         }
977
978         SAFE_FREE(ctdb_data.dptr);
979
980         /* we weren't able to get it locally - ask ctdb to fetch it for us */
981         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
982         if (!NT_STATUS_IS_OK(status)) {
983                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
984                 return -1;
985         }
986
987         return 0;
988 }
989
990 struct traverse_state {
991         struct db_context *db;
992         int (*fn)(struct db_record *rec, void *private_data);
993         void *private_data;
994 };
995
996 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
997 {
998         struct traverse_state *state = (struct traverse_state *)private_data;
999         struct db_record *rec;
1000         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1001         /* we have to give them a locked record to prevent races */
1002         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1003         if (rec && rec->value.dsize > 0) {
1004                 state->fn(rec, state->private_data);
1005         }
1006         talloc_free(tmp_ctx);
1007 }
1008
1009 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1010                                         void *private_data)
1011 {
1012         struct traverse_state *state = (struct traverse_state *)private_data;
1013         struct db_record *rec;
1014         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1015         int ret = 0;
1016         /* we have to give them a locked record to prevent races */
1017         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1018         if (rec && rec->value.dsize > 0) {
1019                 ret = state->fn(rec, state->private_data);
1020         }
1021         talloc_free(tmp_ctx);
1022         return ret;
1023 }
1024
1025 static int db_ctdb_traverse(struct db_context *db,
1026                             int (*fn)(struct db_record *rec,
1027                                       void *private_data),
1028                             void *private_data)
1029 {
1030         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1031                                                         struct db_ctdb_ctx);
1032         struct traverse_state state;
1033
1034         state.db = db;
1035         state.fn = fn;
1036         state.private_data = private_data;
1037
1038         if (db->persistent) {
1039                 /* for persistent databases we don't need to do a ctdb traverse,
1040                    we can do a faster local traverse */
1041                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1042         }
1043
1044
1045         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1046         return 0;
1047 }
1048
1049 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1050 {
1051         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1052 }
1053
1054 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1055 {
1056         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1057 }
1058
1059 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1060 {
1061         struct traverse_state *state = (struct traverse_state *)private_data;
1062         struct db_record rec;
1063         rec.key = key;
1064         rec.value = data;
1065         rec.store = db_ctdb_store_deny;
1066         rec.delete_rec = db_ctdb_delete_deny;
1067         rec.private_data = state->db;
1068         state->fn(&rec, state->private_data);
1069 }
1070
1071 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1072                                         void *private_data)
1073 {
1074         struct traverse_state *state = (struct traverse_state *)private_data;
1075         struct db_record rec;
1076         rec.key = kbuf;
1077         rec.value = dbuf;
1078         rec.store = db_ctdb_store_deny;
1079         rec.delete_rec = db_ctdb_delete_deny;
1080         rec.private_data = state->db;
1081
1082         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1083                 /* a deleted record */
1084                 return 0;
1085         }
1086         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1087         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1088
1089         return state->fn(&rec, state->private_data);
1090 }
1091
1092 static int db_ctdb_traverse_read(struct db_context *db,
1093                                  int (*fn)(struct db_record *rec,
1094                                            void *private_data),
1095                                  void *private_data)
1096 {
1097         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1098                                                         struct db_ctdb_ctx);
1099         struct traverse_state state;
1100
1101         state.db = db;
1102         state.fn = fn;
1103         state.private_data = private_data;
1104
1105         if (db->persistent) {
1106                 /* for persistent databases we don't need to do a ctdb traverse,
1107                    we can do a faster local traverse */
1108                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1109         }
1110
1111         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1112         return 0;
1113 }
1114
1115 static int db_ctdb_get_seqnum(struct db_context *db)
1116 {
1117         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1118                                                         struct db_ctdb_ctx);
1119         return tdb_get_seqnum(ctx->wtdb->tdb);
1120 }
1121
1122 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1123                                 const char *name,
1124                                 int hash_size, int tdb_flags,
1125                                 int open_flags, mode_t mode)
1126 {
1127         struct db_context *result;
1128         struct db_ctdb_ctx *db_ctdb;
1129         char *db_path;
1130
1131         if (!lp_clustering()) {
1132                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1133                 return NULL;
1134         }
1135
1136         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1137                 DEBUG(0, ("talloc failed\n"));
1138                 TALLOC_FREE(result);
1139                 return NULL;
1140         }
1141
1142         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1143                 DEBUG(0, ("talloc failed\n"));
1144                 TALLOC_FREE(result);
1145                 return NULL;
1146         }
1147
1148         db_ctdb->transaction = NULL;
1149         db_ctdb->db = result;
1150
1151         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1152                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1153                 TALLOC_FREE(result);
1154                 return NULL;
1155         }
1156
1157         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1158
1159         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1160
1161         /* only pass through specific flags */
1162         tdb_flags &= TDB_SEQNUM;
1163
1164         /* honor permissions if user has specified O_CREAT */
1165         if (open_flags & O_CREAT) {
1166                 chmod(db_path, mode);
1167         }
1168
1169         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1170         if (db_ctdb->wtdb == NULL) {
1171                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1172                 TALLOC_FREE(result);
1173                 return NULL;
1174         }
1175         talloc_free(db_path);
1176
1177         result->private_data = (void *)db_ctdb;
1178         result->fetch_locked = db_ctdb_fetch_locked;
1179         result->fetch = db_ctdb_fetch;
1180         result->traverse = db_ctdb_traverse;
1181         result->traverse_read = db_ctdb_traverse_read;
1182         result->get_seqnum = db_ctdb_get_seqnum;
1183         result->transaction_start = db_ctdb_transaction_start;
1184         result->transaction_commit = db_ctdb_transaction_commit;
1185         result->transaction_cancel = db_ctdb_transaction_cancel;
1186
1187         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1188                  name, db_ctdb->db_id));
1189
1190         return result;
1191 }
1192 #endif