put a limit on the number of retries. I found a case where a recovery
[amitay/samba.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34 };
35
36 struct db_ctdb_ctx {
37         struct db_context *db;
38         struct tdb_wrap *wtdb;
39         uint32 db_id;
40         struct db_ctdb_transaction_handle *transaction;
41 };
42
43 struct db_ctdb_rec {
44         struct db_ctdb_ctx *ctdb_ctx;
45         struct ctdb_ltdb_header header;
46 };
47
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
49                                                TALLOC_CTX *mem_ctx,
50                                                TDB_DATA key,
51                                                bool persistent);
52
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
54 {
55         NTSTATUS status;
56         enum TDB_ERROR tret = tdb_error(tdb);
57
58         switch (tret) {
59         case TDB_ERR_EXISTS:
60                 status = NT_STATUS_OBJECT_NAME_COLLISION;
61                 break;
62         case TDB_ERR_NOEXIST:
63                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
64                 break;
65         default:
66                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
67                 break;
68         }
69
70         return status;
71 }
72
73
74
75 /*
76   form a ctdb_rec_data record from a key/data pair
77   
78   note that header may be NULL. If not NULL then it is included in the data portion
79   of the record
80  */
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
82                                                   TDB_DATA key, 
83                                                   struct ctdb_ltdb_header *header,
84                                                   TDB_DATA data)
85 {
86         size_t length;
87         struct ctdb_rec_data *d;
88
89         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
90                 data.dsize + (header?sizeof(*header):0);
91         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
92         if (d == NULL) {
93                 return NULL;
94         }
95         d->length = length;
96         d->reqid = reqid;
97         d->keylen = key.dsize;
98         memcpy(&d->data[0], key.dptr, key.dsize);
99         if (header) {
100                 d->datalen = data.dsize + sizeof(*header);
101                 memcpy(&d->data[key.dsize], header, sizeof(*header));
102                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
103         } else {
104                 d->datalen = data.dsize;
105                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
106         }
107         return d;
108 }
109
110
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
113                                                struct ctdb_marshall_buffer *m,
114                                                uint64_t db_id,
115                                                uint32_t reqid,
116                                                TDB_DATA key,
117                                                struct ctdb_ltdb_header *header,
118                                                TDB_DATA data)
119 {
120         struct ctdb_rec_data *r;
121         size_t m_size, r_size;
122         struct ctdb_marshall_buffer *m2;
123
124         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
125         if (r == NULL) {
126                 talloc_free(m);
127                 return NULL;
128         }
129
130         if (m == NULL) {
131                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
132                 if (m == NULL) {
133                         return NULL;
134                 }
135                 m->db_id = db_id;
136         }
137
138         m_size = talloc_get_size(m);
139         r_size = talloc_get_size(r);
140
141         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
142         if (m2 == NULL) {
143                 talloc_free(m);
144                 return NULL;
145         }
146
147         memcpy(m_size + (uint8_t *)m2, r, r_size);
148
149         talloc_free(r);
150
151         m2->count++;
152
153         return m2;
154 }
155
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
158 {
159         TDB_DATA data;
160         data.dptr = (uint8_t *)m;
161         data.dsize = talloc_get_size(m);
162         return data;
163 }
164
165 /* 
166    loop over a marshalling buffer 
167    
168      - pass r==NULL to start
169      - loop the number of times indicated by m->count
170 */
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
172                                                      uint32_t *reqid,
173                                                      struct ctdb_ltdb_header *header,
174                                                      TDB_DATA *key, TDB_DATA *data)
175 {
176         if (r == NULL) {
177                 r = (struct ctdb_rec_data *)&m->data[0];
178         } else {
179                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
180         }
181
182         if (reqid != NULL) {
183                 *reqid = r->reqid;
184         }
185         
186         if (key != NULL) {
187                 key->dptr   = &r->data[0];
188                 key->dsize  = r->keylen;
189         }
190         if (data != NULL) {
191                 data->dptr  = &r->data[r->keylen];
192                 data->dsize = r->datalen;
193                 if (header != NULL) {
194                         data->dptr += sizeof(*header);
195                         data->dsize -= sizeof(*header);
196                 }
197         }
198
199         if (header != NULL) {
200                 if (r->datalen < sizeof(*header)) {
201                         return NULL;
202                 }
203                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
204         }
205
206         return r;
207 }
208
209
210
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
213 {
214         tdb_transaction_cancel(h->ctx->wtdb->tdb);
215         return 0;
216 }
217
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
220 {
221         struct db_record *rh;
222         TDB_DATA key;
223         TALLOC_CTX *tmp_ctx;
224         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
225         int ret;
226         struct db_ctdb_ctx *ctx = h->ctx;
227         TDB_DATA data;
228
229         key.dptr = discard_const(keyname);
230         key.dsize = strlen(keyname);
231
232 again:
233         tmp_ctx = talloc_new(h);
234
235         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
236         if (rh == NULL) {
237                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
238                 talloc_free(tmp_ctx);
239                 return -1;
240         }
241         talloc_free(rh);
242
243         ret = tdb_transaction_start(ctx->wtdb->tdb);
244         if (ret != 0) {
245                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246                 talloc_free(tmp_ctx);
247                 return -1;
248         }
249
250         data = tdb_fetch(ctx->wtdb->tdb, key);
251         if ((data.dptr == NULL) ||
252             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254                 SAFE_FREE(data.dptr);
255                 tdb_transaction_cancel(ctx->wtdb->tdb);
256                 talloc_free(tmp_ctx);
257                 goto again;
258         }
259
260         SAFE_FREE(data.dptr);
261         talloc_free(tmp_ctx);
262
263         return 0;
264 }
265
266
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
269 {
270         struct db_ctdb_transaction_handle *h;
271         int ret;
272         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
273                                                         struct db_ctdb_ctx);
274
275         if (!db->persistent) {
276                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
277                          ctx->db_id));
278                 return -1;
279         }
280
281         if (ctx->transaction) {
282                 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
283                 return -1;
284         }
285
286         h = talloc_zero(db, struct db_ctdb_transaction_handle);
287         if (h == NULL) {
288                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
289                 return -1;
290         }
291
292         h->ctx = ctx;
293
294         ret = db_ctdb_transaction_fetch_start(h);
295         if (ret != 0) {
296                 talloc_free(h);
297                 return -1;
298         }
299
300         talloc_set_destructor(h, db_ctdb_transaction_destructor);
301
302         ctx->transaction = h;
303
304         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
305
306         return 0;
307 }
308
309
310
311 /*
312   fetch a record inside a transaction
313  */
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
315                                      TALLOC_CTX *mem_ctx, 
316                                      TDB_DATA key, TDB_DATA *data)
317 {
318         struct db_ctdb_transaction_handle *h = db->transaction;
319
320         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
321
322         if (data->dptr != NULL) {
323                 uint8_t *oldptr = (uint8_t *)data->dptr;
324                 data->dsize -= sizeof(struct ctdb_ltdb_header);
325                 if (data->dsize == 0) {
326                         data->dptr = NULL;
327                 } else {
328                         data->dptr = (uint8 *)
329                                 talloc_memdup(
330                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
331                                         data->dsize);
332                 }
333                 SAFE_FREE(oldptr);
334                 if (data->dptr == NULL && data->dsize != 0) {
335                         return -1;
336                 }
337         }
338
339         if (!h->in_replay) {
340                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341                 if (h->m_all == NULL) {
342                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
343                         data->dsize = 0;
344                         talloc_free(data->dptr);
345                         return -1;
346                 }
347         }
348
349         return 0;
350 }
351
352
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
355
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
357                                                           TALLOC_CTX *mem_ctx,
358                                                           TDB_DATA key)
359 {
360         struct db_record *result;
361         TDB_DATA ctdb_data;
362
363         if (!(result = talloc(mem_ctx, struct db_record))) {
364                 DEBUG(0, ("talloc failed\n"));
365                 return NULL;
366         }
367
368         result->private_data = ctx->transaction;
369
370         result->key.dsize = key.dsize;
371         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372         if (result->key.dptr == NULL) {
373                 DEBUG(0, ("talloc failed\n"));
374                 TALLOC_FREE(result);
375                 return NULL;
376         }
377
378         result->store = db_ctdb_store_transaction;
379         result->delete_rec = db_ctdb_delete_transaction;
380
381         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382         if (ctdb_data.dptr == NULL) {
383                 /* create the record */
384                 result->value = tdb_null;
385                 return result;
386         }
387
388         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389         result->value.dptr = NULL;
390
391         if ((result->value.dsize != 0)
392             && !(result->value.dptr = (uint8 *)talloc_memdup(
393                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394                          result->value.dsize))) {
395                 DEBUG(0, ("talloc failed\n"));
396                 TALLOC_FREE(result);
397         }
398
399         SAFE_FREE(ctdb_data.dptr);
400
401         return result;
402 }
403
404 static int db_ctdb_record_destructor(struct db_record *rec)
405 {
406         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407                 rec->private_data, struct db_ctdb_transaction_handle);
408         int ret = h->ctx->db->transaction_commit(h->ctx->db);
409         if (ret != 0) {
410                 DEBUG(0,(__location__ " transaction_commit failed\n"));
411         }
412         return 0;
413 }
414
415 /*
416   auto-create a transaction for persistent databases
417  */
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
419                                                          TALLOC_CTX *mem_ctx,
420                                                          TDB_DATA key)
421 {
422         int res;
423         struct db_record *rec;
424
425         res = db_ctdb_transaction_start(ctx->db);
426         if (res == -1) {
427                 return NULL;
428         }
429
430         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
431         if (rec == NULL) {
432                 ctx->db->transaction_cancel(ctx->db);           
433                 return NULL;
434         }
435
436         /* destroy this transaction when we release the lock */
437         talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
438         return rec;
439 }
440
441
442 /*
443   stores a record inside a transaction
444  */
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
446                                      TDB_DATA key, TDB_DATA data)
447 {
448         TALLOC_CTX *tmp_ctx = talloc_new(h);
449         int ret;
450         TDB_DATA rec;
451         struct ctdb_ltdb_header header;
452
453         /* we need the header so we can update the RSN */
454         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455         if (rec.dptr == NULL) {
456                 /* the record doesn't exist - create one with us as dmaster.
457                    This is only safe because we are in a transaction and this
458                    is a persistent database */
459                 ZERO_STRUCT(header);
460                 header.dmaster = get_my_vnn();
461         } else {
462                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
463                 SAFE_FREE(rec.dptr);
464         }
465
466         header.rsn++;
467
468         if (!h->in_replay) {
469                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
470                 if (h->m_all == NULL) {
471                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
472                         talloc_free(tmp_ctx);
473                         return -1;
474                 }
475                 
476                 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
477                 if (h->m_write == NULL) {
478                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
479                         talloc_free(tmp_ctx);
480                         return -1;
481                 }
482         }
483         
484         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
485         rec.dptr = talloc_size(tmp_ctx, rec.dsize);
486         if (rec.dptr == NULL) {
487                 DEBUG(0,(__location__ " Failed to alloc record\n"));
488                 talloc_free(tmp_ctx);
489                 return -1;
490         }
491         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
492         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
493
494         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
495
496         talloc_free(tmp_ctx);
497         
498         return ret;
499 }
500
501
502 /* 
503    a record store inside a transaction
504  */
505 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
506 {
507         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
508                 rec->private_data, struct db_ctdb_transaction_handle);
509         int ret;
510
511         ret = db_ctdb_transaction_store(h, rec->key, data);
512         if (ret != 0) {
513                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
514         }
515         return NT_STATUS_OK;
516 }
517
518 /* 
519    a record delete inside a transaction
520  */
521 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
522 {
523         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
524                 rec->private_data, struct db_ctdb_transaction_handle);
525         int ret;
526
527         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
528         if (ret != 0) {
529                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
530         }
531         return NT_STATUS_OK;
532 }
533
534
535 /*
536   replay a transaction
537  */
538 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
539 {
540         int ret, i;
541         struct ctdb_rec_data *rec = NULL;
542
543         h->in_replay = true;
544
545         ret = db_ctdb_transaction_fetch_start(h);
546         if (ret != 0) {
547                 return ret;
548         }
549
550         for (i=0;i<h->m_all->count;i++) {
551                 TDB_DATA key, data;
552
553                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
554                 if (rec == NULL) {
555                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
556                         goto failed;
557                 }
558
559                 if (rec->reqid == 0) {
560                         /* its a store */
561                         if (db_ctdb_transaction_store(h, key, data) != 0) {
562                                 goto failed;
563                         }
564                 } else {
565                         TDB_DATA data2;
566                         TALLOC_CTX *tmp_ctx = talloc_new(h);
567
568                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
569                                 talloc_free(tmp_ctx);
570                                 goto failed;
571                         }
572                         if (data2.dsize != data.dsize ||
573                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
574                                 /* the record has changed on us - we have to give up */
575                                 talloc_free(tmp_ctx);
576                                 goto failed;
577                         }
578                         talloc_free(tmp_ctx);
579                 }
580         }
581         
582         return 0;
583
584 failed:
585         tdb_transaction_cancel(h->ctx->wtdb->tdb);
586         return -1;
587 }
588
589
590 /*
591   commit a transaction
592  */
593 static int db_ctdb_transaction_commit(struct db_context *db)
594 {
595         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
596                                                         struct db_ctdb_ctx);
597         NTSTATUS rets;
598         int ret;
599         int status;
600         int retries = 0;
601         struct db_ctdb_transaction_handle *h = ctx->transaction;
602
603         if (h == NULL) {
604                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
605                 return -1;
606         }
607
608         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
609
610         if (h->m_write == NULL) {
611                 /* no changes were made */
612                 talloc_free(h);
613                 ctx->transaction = NULL;
614                 return 0;
615         }
616
617         talloc_set_destructor(h, NULL);
618
619         /* our commit strategy is quite complex.
620
621            - we first try to commit the changes to all other nodes
622
623            - if that works, then we commit locally and we are done
624
625            - if a commit on another node fails, then we need to cancel
626              the transaction, then restart the transaction (thus
627              opening a window of time for a pending recovery to
628              complete), then replay the transaction, checking all the
629              reads and writes (checking that reads give the same data,
630              and writes succeed). Then we retry the transaction to the
631              other nodes
632         */
633
634 again:
635         /* tell ctdbd to commit to the other nodes */
636         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
637                                   CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
638                                   db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
639         if (!NT_STATUS_IS_OK(rets) || status != 0) {
640                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
641                 sleep(1);
642                 if (ctdb_replay_transaction(h) != 0) {
643                         DEBUG(0,(__location__ " Failed to replay transaction\n"));
644                         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
645                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
646                                             tdb_null, NULL, NULL, NULL);
647                         h->ctx->transaction = NULL;
648                         talloc_free(h);
649                         ctx->transaction = NULL;
650                         return -1;
651                 }
652                 if (retries++ == 10) {
653                         DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries\n", 
654                                  h->ctx->db_id, retries));
655                         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
656                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
657                                             tdb_null, NULL, NULL, NULL);
658                         h->ctx->transaction = NULL;
659                         talloc_free(h);
660                         ctx->transaction = NULL;
661                         return -1;                      
662                 }
663                 goto again;
664         }
665
666         /* do the real commit locally */
667         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
668         if (ret != 0) {
669                 DEBUG(0,(__location__ " Failed to commit transaction\n"));
670                 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id, 
671                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
672                 h->ctx->transaction = NULL;
673                 talloc_free(h);
674                 return ret;
675         }
676
677         /* tell ctdbd that we are finished with our local commit */
678         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
679                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
680                             tdb_null, NULL, NULL, NULL);
681         h->ctx->transaction = NULL;
682         talloc_free(h);
683         return 0;
684 }
685
686
687 /*
688   cancel a transaction
689  */
690 static int db_ctdb_transaction_cancel(struct db_context *db)
691 {
692         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
693                                                         struct db_ctdb_ctx);
694         struct db_ctdb_transaction_handle *h = ctx->transaction;
695
696         if (h == NULL) {
697                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
698                 return -1;
699         }
700
701         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
702
703         ctx->transaction = NULL;
704         talloc_free(h);
705         return 0;
706 }
707
708
709 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
710 {
711         struct db_ctdb_rec *crec = talloc_get_type_abort(
712                 rec->private_data, struct db_ctdb_rec);
713         TDB_DATA cdata;
714         int ret;
715
716         cdata.dsize = sizeof(crec->header) + data.dsize;
717
718         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
719                 return NT_STATUS_NO_MEMORY;
720         }
721
722         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
723         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
724
725         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
726
727         SAFE_FREE(cdata.dptr);
728
729         return (ret == 0) ? NT_STATUS_OK
730                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
731 }
732
733
734
735 static NTSTATUS db_ctdb_delete(struct db_record *rec)
736 {
737         TDB_DATA data;
738
739         /*
740          * We have to store the header with empty data. TODO: Fix the
741          * tdb-level cleanup
742          */
743
744         ZERO_STRUCT(data);
745
746         return db_ctdb_store(rec, data, 0);
747
748 }
749
750 static int db_ctdb_record_destr(struct db_record* data)
751 {
752         struct db_ctdb_rec *crec = talloc_get_type_abort(
753                 data->private_data, struct db_ctdb_rec);
754
755         DEBUG(10, (DEBUGLEVEL > 10
756                    ? "Unlocking db %u key %s\n"
757                    : "Unlocking db %u key %.20s\n",
758                    (int)crec->ctdb_ctx->db_id,
759                    hex_encode(data, (unsigned char *)data->key.dptr,
760                               data->key.dsize)));
761
762         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
763                 DEBUG(0, ("tdb_chainunlock failed\n"));
764                 return -1;
765         }
766
767         return 0;
768 }
769
770 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
771                                                TALLOC_CTX *mem_ctx,
772                                                TDB_DATA key,
773                                                bool persistent)
774 {
775         struct db_record *result;
776         struct db_ctdb_rec *crec;
777         NTSTATUS status;
778         TDB_DATA ctdb_data;
779         int migrate_attempts = 0;
780
781         if (!(result = talloc(mem_ctx, struct db_record))) {
782                 DEBUG(0, ("talloc failed\n"));
783                 return NULL;
784         }
785
786         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
787                 DEBUG(0, ("talloc failed\n"));
788                 TALLOC_FREE(result);
789                 return NULL;
790         }
791
792         result->private_data = (void *)crec;
793         crec->ctdb_ctx = ctx;
794
795         result->key.dsize = key.dsize;
796         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
797         if (result->key.dptr == NULL) {
798                 DEBUG(0, ("talloc failed\n"));
799                 TALLOC_FREE(result);
800                 return NULL;
801         }
802
803         /*
804          * Do a blocking lock on the record
805          */
806 again:
807
808         if (DEBUGLEVEL >= 10) {
809                 char *keystr = hex_encode(result, key.dptr, key.dsize);
810                 DEBUG(10, (DEBUGLEVEL > 10
811                            ? "Locking db %u key %s\n"
812                            : "Locking db %u key %.20s\n",
813                            (int)crec->ctdb_ctx->db_id, keystr));
814                 TALLOC_FREE(keystr);
815         }
816         
817         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
818                 DEBUG(3, ("tdb_chainlock failed\n"));
819                 TALLOC_FREE(result);
820                 return NULL;
821         }
822
823         result->store = db_ctdb_store;
824         result->delete_rec = db_ctdb_delete;
825         talloc_set_destructor(result, db_ctdb_record_destr);
826
827         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
828
829         /*
830          * See if we have a valid record and we are the dmaster. If so, we can
831          * take the shortcut and just return it.
832          */
833
834         if ((ctdb_data.dptr == NULL) ||
835             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
836             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
837 #if 0
838             || (random() % 2 != 0)
839 #endif
840 ) {
841                 SAFE_FREE(ctdb_data.dptr);
842                 tdb_chainunlock(ctx->wtdb->tdb, key);
843                 talloc_set_destructor(result, NULL);
844
845                 migrate_attempts += 1;
846
847                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
848                            ctdb_data.dptr, ctdb_data.dptr ?
849                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
850                            get_my_vnn()));
851
852                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
853                 if (!NT_STATUS_IS_OK(status)) {
854                         DEBUG(5, ("ctdb_migrate failed: %s\n",
855                                   nt_errstr(status)));
856                         TALLOC_FREE(result);
857                         return NULL;
858                 }
859                 /* now its migrated, try again */
860                 goto again;
861         }
862
863         if (migrate_attempts > 10) {
864                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
865                           migrate_attempts));
866         }
867
868         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
869
870         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
871         result->value.dptr = NULL;
872
873         if ((result->value.dsize != 0)
874             && !(result->value.dptr = (uint8 *)talloc_memdup(
875                          result, ctdb_data.dptr + sizeof(crec->header),
876                          result->value.dsize))) {
877                 DEBUG(0, ("talloc failed\n"));
878                 TALLOC_FREE(result);
879         }
880
881         SAFE_FREE(ctdb_data.dptr);
882
883         return result;
884 }
885
886 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
887                                               TALLOC_CTX *mem_ctx,
888                                               TDB_DATA key)
889 {
890         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
891                                                         struct db_ctdb_ctx);
892
893         if (ctx->transaction != NULL) {
894                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
895         }
896
897         if (db->persistent) {
898                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
899         }
900
901         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
902 }
903
904 /*
905   fetch (unlocked, no migration) operation on ctdb
906  */
907 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
908                          TDB_DATA key, TDB_DATA *data)
909 {
910         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
911                                                         struct db_ctdb_ctx);
912         NTSTATUS status;
913         TDB_DATA ctdb_data;
914
915         if (ctx->transaction) {
916                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
917         }
918
919         /* try a direct fetch */
920         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
921
922         /*
923          * See if we have a valid record and we are the dmaster. If so, we can
924          * take the shortcut and just return it.
925          * we bypass the dmaster check for persistent databases
926          */
927         if ((ctdb_data.dptr != NULL) &&
928             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
929             (db->persistent ||
930              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
931                 /* we are the dmaster - avoid the ctdb protocol op */
932
933                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
934                 if (data->dsize == 0) {
935                         SAFE_FREE(ctdb_data.dptr);
936                         data->dptr = NULL;
937                         return 0;
938                 }
939
940                 data->dptr = (uint8 *)talloc_memdup(
941                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
942                         data->dsize);
943
944                 SAFE_FREE(ctdb_data.dptr);
945
946                 if (data->dptr == NULL) {
947                         return -1;
948                 }
949                 return 0;
950         }
951
952         SAFE_FREE(ctdb_data.dptr);
953
954         /* we weren't able to get it locally - ask ctdb to fetch it for us */
955         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
956         if (!NT_STATUS_IS_OK(status)) {
957                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
958                 return -1;
959         }
960
961         return 0;
962 }
963
964 struct traverse_state {
965         struct db_context *db;
966         int (*fn)(struct db_record *rec, void *private_data);
967         void *private_data;
968 };
969
970 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
971 {
972         struct traverse_state *state = (struct traverse_state *)private_data;
973         struct db_record *rec;
974         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
975         /* we have to give them a locked record to prevent races */
976         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
977         if (rec && rec->value.dsize > 0) {
978                 state->fn(rec, state->private_data);
979         }
980         talloc_free(tmp_ctx);
981 }
982
983 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
984                                         void *private_data)
985 {
986         struct traverse_state *state = (struct traverse_state *)private_data;
987         struct db_record *rec;
988         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
989         int ret = 0;
990         /* we have to give them a locked record to prevent races */
991         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
992         if (rec && rec->value.dsize > 0) {
993                 ret = state->fn(rec, state->private_data);
994         }
995         talloc_free(tmp_ctx);
996         return ret;
997 }
998
999 static int db_ctdb_traverse(struct db_context *db,
1000                             int (*fn)(struct db_record *rec,
1001                                       void *private_data),
1002                             void *private_data)
1003 {
1004         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1005                                                         struct db_ctdb_ctx);
1006         struct traverse_state state;
1007
1008         state.db = db;
1009         state.fn = fn;
1010         state.private_data = private_data;
1011
1012         if (db->persistent) {
1013                 /* for persistent databases we don't need to do a ctdb traverse,
1014                    we can do a faster local traverse */
1015                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1016         }
1017
1018
1019         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1020         return 0;
1021 }
1022
1023 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1024 {
1025         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1026 }
1027
1028 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1029 {
1030         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1031 }
1032
1033 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1034 {
1035         struct traverse_state *state = (struct traverse_state *)private_data;
1036         struct db_record rec;
1037         rec.key = key;
1038         rec.value = data;
1039         rec.store = db_ctdb_store_deny;
1040         rec.delete_rec = db_ctdb_delete_deny;
1041         rec.private_data = state->db;
1042         state->fn(&rec, state->private_data);
1043 }
1044
1045 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1046                                         void *private_data)
1047 {
1048         struct traverse_state *state = (struct traverse_state *)private_data;
1049         struct db_record rec;
1050         rec.key = kbuf;
1051         rec.value = dbuf;
1052         rec.store = db_ctdb_store_deny;
1053         rec.delete_rec = db_ctdb_delete_deny;
1054         rec.private_data = state->db;
1055
1056         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1057                 /* a deleted record */
1058                 return 0;
1059         }
1060         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1061         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1062
1063         return state->fn(&rec, state->private_data);
1064 }
1065
1066 static int db_ctdb_traverse_read(struct db_context *db,
1067                                  int (*fn)(struct db_record *rec,
1068                                            void *private_data),
1069                                  void *private_data)
1070 {
1071         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1072                                                         struct db_ctdb_ctx);
1073         struct traverse_state state;
1074
1075         state.db = db;
1076         state.fn = fn;
1077         state.private_data = private_data;
1078
1079         if (db->persistent) {
1080                 /* for persistent databases we don't need to do a ctdb traverse,
1081                    we can do a faster local traverse */
1082                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1083         }
1084
1085         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1086         return 0;
1087 }
1088
1089 static int db_ctdb_get_seqnum(struct db_context *db)
1090 {
1091         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1092                                                         struct db_ctdb_ctx);
1093         return tdb_get_seqnum(ctx->wtdb->tdb);
1094 }
1095
1096 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1097                                 const char *name,
1098                                 int hash_size, int tdb_flags,
1099                                 int open_flags, mode_t mode)
1100 {
1101         struct db_context *result;
1102         struct db_ctdb_ctx *db_ctdb;
1103         char *db_path;
1104
1105         if (!lp_clustering()) {
1106                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1107                 return NULL;
1108         }
1109
1110         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1111                 DEBUG(0, ("talloc failed\n"));
1112                 TALLOC_FREE(result);
1113                 return NULL;
1114         }
1115
1116         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1117                 DEBUG(0, ("talloc failed\n"));
1118                 TALLOC_FREE(result);
1119                 return NULL;
1120         }
1121
1122         db_ctdb->transaction = NULL;
1123         db_ctdb->db = result;
1124
1125         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1126                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1127                 TALLOC_FREE(result);
1128                 return NULL;
1129         }
1130
1131         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1132
1133         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1134
1135         /* only pass through specific flags */
1136         tdb_flags &= TDB_SEQNUM;
1137
1138         /* honor permissions if user has specified O_CREAT */
1139         if (open_flags & O_CREAT) {
1140                 chmod(db_path, mode);
1141         }
1142
1143         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1144         if (db_ctdb->wtdb == NULL) {
1145                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1146                 TALLOC_FREE(result);
1147                 return NULL;
1148         }
1149         talloc_free(db_path);
1150
1151         result->private_data = (void *)db_ctdb;
1152         result->fetch_locked = db_ctdb_fetch_locked;
1153         result->fetch = db_ctdb_fetch;
1154         result->traverse = db_ctdb_traverse;
1155         result->traverse_read = db_ctdb_traverse_read;
1156         result->get_seqnum = db_ctdb_get_seqnum;
1157         result->transaction_start = db_ctdb_transaction_start;
1158         result->transaction_commit = db_ctdb_transaction_commit;
1159         result->transaction_cancel = db_ctdb_transaction_cancel;
1160
1161         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1162                  name, db_ctdb->db_id));
1163
1164         return result;
1165 }
1166 #endif