a6bda8e403331c7f8a30f1a6676fd8c908c5e6dc
[jra/samba/.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34 };
35
36 struct db_ctdb_ctx {
37         struct tdb_wrap *wtdb;
38         uint32 db_id;
39         struct db_ctdb_transaction_handle *transaction;
40 };
41
42 struct db_ctdb_rec {
43         struct db_ctdb_ctx *ctdb_ctx;
44         struct ctdb_ltdb_header header;
45 };
46
47 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
48                                                TALLOC_CTX *mem_ctx,
49                                                TDB_DATA key,
50                                                bool persistent);
51
52 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
53 {
54         NTSTATUS status;
55         enum TDB_ERROR tret = tdb_error(tdb);
56
57         switch (tret) {
58         case TDB_ERR_EXISTS:
59                 status = NT_STATUS_OBJECT_NAME_COLLISION;
60                 break;
61         case TDB_ERR_NOEXIST:
62                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
63                 break;
64         default:
65                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
66                 break;
67         }
68
69         return status;
70 }
71
72
73
74 /*
75   form a ctdb_rec_data record from a key/data pair
76   
77   note that header may be NULL. If not NULL then it is included in the data portion
78   of the record
79  */
80 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
81                                                   TDB_DATA key, 
82                                                   struct ctdb_ltdb_header *header,
83                                                   TDB_DATA data)
84 {
85         size_t length;
86         struct ctdb_rec_data *d;
87
88         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
89                 data.dsize + (header?sizeof(*header):0);
90         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
91         if (d == NULL) {
92                 return NULL;
93         }
94         d->length = length;
95         d->reqid = reqid;
96         d->keylen = key.dsize;
97         memcpy(&d->data[0], key.dptr, key.dsize);
98         if (header) {
99                 d->datalen = data.dsize + sizeof(*header);
100                 memcpy(&d->data[key.dsize], header, sizeof(*header));
101                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
102         } else {
103                 d->datalen = data.dsize;
104                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
105         }
106         return d;
107 }
108
109
110 /* helper function for marshalling multiple records */
111 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
112                                                struct ctdb_marshall_buffer *m,
113                                                uint64_t db_id,
114                                                uint32_t reqid,
115                                                TDB_DATA key,
116                                                struct ctdb_ltdb_header *header,
117                                                TDB_DATA data)
118 {
119         struct ctdb_rec_data *r;
120         size_t m_size, r_size;
121         struct ctdb_marshall_buffer *m2;
122
123         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
124         if (r == NULL) {
125                 talloc_free(m);
126                 return NULL;
127         }
128
129         if (m == NULL) {
130                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
131                 if (m == NULL) {
132                         return NULL;
133                 }
134                 m->db_id = db_id;
135         }
136
137         m_size = talloc_get_size(m);
138         r_size = talloc_get_size(r);
139
140         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
141         if (m2 == NULL) {
142                 talloc_free(m);
143                 return NULL;
144         }
145
146         memcpy(m_size + (uint8_t *)m2, r, r_size);
147
148         talloc_free(r);
149
150         m2->count++;
151
152         return m2;
153 }
154
155 /* we've finished marshalling, return a data blob with the marshalled records */
156 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
157 {
158         TDB_DATA data;
159         data.dptr = (uint8_t *)m;
160         data.dsize = talloc_get_size(m);
161         return data;
162 }
163
164 /* 
165    loop over a marshalling buffer 
166    
167      - pass r==NULL to start
168      - loop the number of times indicated by m->count
169 */
170 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
171                                                      uint32_t *reqid,
172                                                      struct ctdb_ltdb_header *header,
173                                                      TDB_DATA *key, TDB_DATA *data)
174 {
175         if (r == NULL) {
176                 r = (struct ctdb_rec_data *)&m->data[0];
177         } else {
178                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
179         }
180
181         if (reqid != NULL) {
182                 *reqid = r->reqid;
183         }
184         
185         if (key != NULL) {
186                 key->dptr   = &r->data[0];
187                 key->dsize  = r->keylen;
188         }
189         if (data != NULL) {
190                 data->dptr  = &r->data[r->keylen];
191                 data->dsize = r->datalen;
192                 if (header != NULL) {
193                         data->dptr += sizeof(*header);
194                         data->dsize -= sizeof(*header);
195                 }
196         }
197
198         if (header != NULL) {
199                 if (r->datalen < sizeof(*header)) {
200                         return NULL;
201                 }
202                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
203         }
204
205         return r;
206 }
207
208
209
210 /* start a transaction on a database */
211 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
212 {
213         tdb_transaction_cancel(h->ctx->wtdb->tdb);
214         return 0;
215 }
216
217 /* start a transaction on a database */
218 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
219 {
220         struct db_record *rh;
221         TDB_DATA key;
222         TALLOC_CTX *tmp_ctx;
223         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
224         int ret;
225         struct db_ctdb_ctx *ctx = h->ctx;
226         TDB_DATA data;
227
228         key.dptr = discard_const(keyname);
229         key.dsize = strlen(keyname);
230
231 again:
232         tmp_ctx = talloc_new(h);
233
234         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
235         if (rh == NULL) {
236                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
237                 talloc_free(tmp_ctx);
238                 return -1;
239         }
240         talloc_free(rh);
241
242         ret = tdb_transaction_start(ctx->wtdb->tdb);
243         if (ret != 0) {
244                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
245                 talloc_free(tmp_ctx);
246                 return -1;
247         }
248
249         data = tdb_fetch(ctx->wtdb->tdb, key);
250         if ((data.dptr == NULL) ||
251             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
252             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
253                 SAFE_FREE(data.dptr);
254                 tdb_transaction_cancel(ctx->wtdb->tdb);
255                 talloc_free(tmp_ctx);
256                 goto again;
257         }
258
259         SAFE_FREE(data.dptr);
260         talloc_free(tmp_ctx);
261
262         return 0;
263 }
264
265
266 /* start a transaction on a database */
267 static int db_ctdb_transaction_start(struct db_context *db)
268 {
269         struct db_ctdb_transaction_handle *h;
270         int ret;
271         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
272                                                         struct db_ctdb_ctx);
273
274         if (!db->persistent) {
275                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
276                          ctx->db_id));
277                 return -1;
278         }
279
280         if (ctx->transaction) {
281                 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
282                 return -1;
283         }
284
285         h = talloc_zero(db, struct db_ctdb_transaction_handle);
286         if (h == NULL) {
287                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
288                 return -1;
289         }
290
291         h->ctx = ctx;
292
293         ret = db_ctdb_transaction_fetch_start(h);
294         if (ret != 0) {
295                 talloc_free(h);
296                 return -1;
297         }
298
299         talloc_set_destructor(h, db_ctdb_transaction_destructor);
300
301         ctx->transaction = h;
302
303         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
304
305         return 0;
306 }
307
308
309
310 /*
311   fetch a record inside a transaction
312  */
313 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
314                                      TALLOC_CTX *mem_ctx, 
315                                      TDB_DATA key, TDB_DATA *data)
316 {
317         struct db_ctdb_transaction_handle *h = db->transaction;
318
319         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
320
321         if (data->dptr != NULL) {
322                 uint8_t *oldptr = (uint8_t *)data->dptr;
323                 data->dsize -= sizeof(struct ctdb_ltdb_header);
324                 if (data->dsize == 0) {
325                         data->dptr = NULL;
326                 } else {
327                         data->dptr = (uint8 *)
328                                 talloc_memdup(
329                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
330                                         data->dsize);
331                 }
332                 SAFE_FREE(oldptr);
333                 if (data->dptr == NULL && data->dsize != 0) {
334                         return -1;
335                 }
336         }
337
338         if (!h->in_replay) {
339                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
340                 if (h->m_all == NULL) {
341                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
342                         data->dsize = 0;
343                         talloc_free(data->dptr);
344                         return -1;
345                 }
346         }
347
348         return 0;
349 }
350
351
352 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
353 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
354
355 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
356                                                           TALLOC_CTX *mem_ctx,
357                                                           TDB_DATA key)
358 {
359         struct db_record *result;
360         TDB_DATA ctdb_data;
361
362         if (!(result = talloc(mem_ctx, struct db_record))) {
363                 DEBUG(0, ("talloc failed\n"));
364                 return NULL;
365         }
366
367         result->private_data = ctx->transaction;
368
369         result->key.dsize = key.dsize;
370         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
371         if (result->key.dptr == NULL) {
372                 DEBUG(0, ("talloc failed\n"));
373                 TALLOC_FREE(result);
374                 return NULL;
375         }
376
377         result->store = db_ctdb_store_transaction;
378         result->delete_rec = db_ctdb_delete_transaction;
379
380         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
381         if (ctdb_data.dptr == NULL) {
382                 /* create the record */
383                 result->value = tdb_null;
384                 return result;
385         }
386
387         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
388         result->value.dptr = NULL;
389
390         if ((result->value.dsize != 0)
391             && !(result->value.dptr = (uint8 *)talloc_memdup(
392                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
393                          result->value.dsize))) {
394                 DEBUG(0, ("talloc failed\n"));
395                 TALLOC_FREE(result);
396         }
397
398         SAFE_FREE(ctdb_data.dptr);
399
400         return result;
401 }
402
403
404 /*
405   stores a record inside a transaction
406  */
407 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
408                                      TDB_DATA key, TDB_DATA data)
409 {
410         TALLOC_CTX *tmp_ctx = talloc_new(h);
411         int ret;
412         TDB_DATA rec;
413         struct ctdb_ltdb_header header;
414
415         /* we need the header so we can update the RSN */
416         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
417         if (rec.dptr == NULL) {
418                 /* the record doesn't exist - create one with us as dmaster.
419                    This is only safe because we are in a transaction and this
420                    is a persistent database */
421                 ZERO_STRUCT(header);
422                 header.dmaster = get_my_vnn();
423         } else {
424                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
425                 SAFE_FREE(rec.dptr);
426         }
427
428         header.rsn++;
429
430         if (!h->in_replay) {
431                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
432                 if (h->m_all == NULL) {
433                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
434                         talloc_free(tmp_ctx);
435                         return -1;
436                 }
437                 
438                 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
439                 if (h->m_write == NULL) {
440                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
441                         talloc_free(tmp_ctx);
442                         return -1;
443                 }
444         }
445         
446         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
447         rec.dptr = talloc_size(tmp_ctx, rec.dsize);
448         if (rec.dptr == NULL) {
449                 DEBUG(0,(__location__ " Failed to alloc record\n"));
450                 talloc_free(tmp_ctx);
451                 return -1;
452         }
453         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
454         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
455
456         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
457
458         talloc_free(tmp_ctx);
459         
460         return ret;
461 }
462
463
464 /* 
465    a record store inside a transaction
466  */
467 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
468 {
469         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
470                 rec->private_data, struct db_ctdb_transaction_handle);
471         int ret;
472
473         ret = db_ctdb_transaction_store(h, rec->key, data);
474         if (ret != 0) {
475                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
476         }
477         return NT_STATUS_OK;
478 }
479
480 /* 
481    a record delete inside a transaction
482  */
483 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
484 {
485         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
486                 rec->private_data, struct db_ctdb_transaction_handle);
487         int ret;
488
489         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
490         if (ret != 0) {
491                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
492         }
493         return NT_STATUS_OK;
494 }
495
496
497 /*
498   replay a transaction
499  */
500 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
501 {
502         int ret, i;
503         struct ctdb_rec_data *rec = NULL;
504
505         h->in_replay = true;
506
507         ret = db_ctdb_transaction_fetch_start(h);
508         if (ret != 0) {
509                 return ret;
510         }
511
512         for (i=0;i<h->m_all->count;i++) {
513                 TDB_DATA key, data;
514
515                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
516                 if (rec == NULL) {
517                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
518                         goto failed;
519                 }
520
521                 if (rec->reqid == 0) {
522                         /* its a store */
523                         if (db_ctdb_transaction_store(h, key, data) != 0) {
524                                 goto failed;
525                         }
526                 } else {
527                         TDB_DATA data2;
528                         TALLOC_CTX *tmp_ctx = talloc_new(h);
529
530                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
531                                 talloc_free(tmp_ctx);
532                                 goto failed;
533                         }
534                         if (data2.dsize != data.dsize ||
535                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
536                                 /* the record has changed on us - we have to give up */
537                                 talloc_free(tmp_ctx);
538                                 goto failed;
539                         }
540                         talloc_free(tmp_ctx);
541                 }
542         }
543         
544         return 0;
545
546 failed:
547         tdb_transaction_cancel(h->ctx->wtdb->tdb);
548         return -1;
549 }
550
551
552 /*
553   commit a transaction
554  */
555 static int db_ctdb_transaction_commit(struct db_context *db)
556 {
557         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
558                                                         struct db_ctdb_ctx);
559         NTSTATUS rets;
560         int ret;
561         int status;
562         struct db_ctdb_transaction_handle *h = ctx->transaction;
563
564         if (h == NULL) {
565                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
566                 return -1;
567         }
568
569         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
570
571         if (h->m_write == NULL) {
572                 /* no changes were made */
573                 talloc_free(h);
574                 ctx->transaction = NULL;
575                 return 0;
576         }
577
578         talloc_set_destructor(h, NULL);
579
580         /* our commit strategy is quite complex.
581
582            - we first try to commit the changes to all other nodes
583
584            - if that works, then we commit locally and we are done
585
586            - if a commit on another node fails, then we need to cancel
587              the transaction, then restart the transaction (thus
588              opening a window of time for a pending recovery to
589              complete), then replay the transaction, checking all the
590              reads and writes (checking that reads give the same data,
591              and writes succeed). Then we retry the transaction to the
592              other nodes
593         */
594
595 again:
596         /* tell ctdbd to commit to the other nodes */
597         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
598                                   CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
599                                   db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
600         if (!NT_STATUS_IS_OK(rets) || status != 0) {
601                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
602                 sleep(1);
603                 if (ctdb_replay_transaction(h) != 0) {
604                         DEBUG(0,(__location__ " Failed to replay transaction\n"));
605                         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
606                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
607                                             tdb_null, NULL, NULL, NULL);
608                         h->ctx->transaction = NULL;
609                         talloc_free(h);
610                         ctx->transaction = NULL;
611                         return -1;
612                 }
613                 goto again;
614         }
615
616         /* do the real commit locally */
617         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
618         if (ret != 0) {
619                 DEBUG(0,(__location__ " Failed to commit transaction\n"));
620                 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id, 
621                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
622                 h->ctx->transaction = NULL;
623                 talloc_free(h);
624                 return ret;
625         }
626
627         /* tell ctdbd that we are finished with our local commit */
628         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
629                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
630                             tdb_null, NULL, NULL, NULL);
631         h->ctx->transaction = NULL;
632         talloc_free(h);
633         return 0;
634 }
635
636
637 /*
638   cancel a transaction
639  */
640 static int db_ctdb_transaction_cancel(struct db_context *db)
641 {
642         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
643                                                         struct db_ctdb_ctx);
644         struct db_ctdb_transaction_handle *h = ctx->transaction;
645
646         if (h == NULL) {
647                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
648                 return -1;
649         }
650
651         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
652
653         ctx->transaction = NULL;
654         talloc_free(h);
655         return 0;
656 }
657
658
659 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
660 {
661         struct db_ctdb_rec *crec = talloc_get_type_abort(
662                 rec->private_data, struct db_ctdb_rec);
663         TDB_DATA cdata;
664         int ret;
665
666         cdata.dsize = sizeof(crec->header) + data.dsize;
667
668         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
669                 return NT_STATUS_NO_MEMORY;
670         }
671
672         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
673         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
674
675         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
676
677         SAFE_FREE(cdata.dptr);
678
679         return (ret == 0) ? NT_STATUS_OK
680                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
681 }
682
683
684 /* for persistent databases the store is a bit different. We have to
685    ask the ctdb daemon to push the record to all nodes after the
686    store */
687 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
688 {
689         struct db_ctdb_rec *crec;
690         struct db_record *record;
691         TDB_DATA cdata;
692         int ret;
693         NTSTATUS status;
694         uint32_t count;
695         int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
696
697         for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
698              (count < max_retries) && !NT_STATUS_IS_OK(status);
699              count++)
700         {
701                 if (count > 0) {
702                         /* retry */
703                         /*
704                          * There is a hack here: We use rec as a memory
705                          * context and re-use it as the record struct ptr.
706                          * We don't free the record data allocated
707                          * in each turn. So all gets freed when the caller
708                          * releases the original record. This is because
709                          * we don't get the record passed in by reference
710                          * in the first place and the caller relies on
711                          * having to free the record himself.
712                          */
713                         record = fetch_locked_internal(crec->ctdb_ctx,
714                                                        rec,
715                                                        rec->key,
716                                                        true /* persistent */);
717                         if (record == NULL) {
718                                 DEBUG(5, ("fetch_locked_internal failed.\n"));
719                                 status = NT_STATUS_NO_MEMORY;
720                                 break;
721                         }
722                 }
723
724                 crec = talloc_get_type_abort(record->private_data,
725                                              struct db_ctdb_rec);
726
727                 cdata.dsize = sizeof(crec->header) + data.dsize;
728
729                 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
730                         return NT_STATUS_NO_MEMORY;
731                 }
732
733                 crec->header.rsn++;
734
735                 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
736                 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
737
738                 status = ctdbd_start_persistent_update(
739                                 messaging_ctdbd_connection(),
740                                 crec->ctdb_ctx->db_id,
741                                 rec->key,
742                                 cdata);
743
744                 if (NT_STATUS_IS_OK(status)) {
745                         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
746                                         cdata, TDB_REPLACE);
747                         status = (ret == 0)
748                                         ? NT_STATUS_OK
749                                         : tdb_error_to_ntstatus(
750                                                 crec->ctdb_ctx->wtdb->tdb);
751                 }
752
753                 /*
754                  * release the lock *now* in order to prevent deadlocks.
755                  *
756                  * There is a tradeoff: Usually, the record is still locked
757                  * after db->store operation. This lock is usually released
758                  * via the talloc destructor with the TALLOC_FREE to
759                  * the record. So we have two choices:
760                  *
761                  * - Either re-lock the record after the call to persistent_store
762                  *   or cancel_persistent update and this way not changing any
763                  *   assumptions callers may have about the state, but possibly
764                  *   introducing new race conditions.
765                  *
766                  * - Or don't lock the record again but just remove the
767                  *   talloc_destructor. This is less racy but assumes that
768                  *   the lock is always released via TALLOC_FREE of the record.
769                  *
770                  * I choose the first variant for now since it seems less racy.
771                  * We can't guarantee that we succeed in getting the lock
772                  * anyways. The only real danger here is that a caller
773                  * performs multiple store operations after a fetch_locked()
774                  * which is currently not the case.
775                  */
776                 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
777                 talloc_set_destructor(record, NULL);
778
779                 /* now tell ctdbd to update this record on all other nodes */
780                 if (NT_STATUS_IS_OK(status)) {
781                         status = ctdbd_persistent_store(
782                                         messaging_ctdbd_connection(),
783                                         crec->ctdb_ctx->db_id,
784                                         rec->key,
785                                         cdata);
786                 } else {
787                         ctdbd_cancel_persistent_update(
788                                         messaging_ctdbd_connection(),
789                                         crec->ctdb_ctx->db_id,
790                                         rec->key,
791                                         cdata);
792                         break;
793                 }
794
795                 SAFE_FREE(cdata.dptr);
796         } /* retry-loop */
797
798         if (!NT_STATUS_IS_OK(status)) {
799                 DEBUG(5, ("ctdbd_persistent_store failed after "
800                           "%d retries with error %s - giving up.\n",
801                           count, nt_errstr(status)));
802         }
803
804         SAFE_FREE(cdata.dptr);
805
806         return status;
807 }
808
809 static NTSTATUS db_ctdb_delete(struct db_record *rec)
810 {
811         TDB_DATA data;
812
813         /*
814          * We have to store the header with empty data. TODO: Fix the
815          * tdb-level cleanup
816          */
817
818         ZERO_STRUCT(data);
819
820         return db_ctdb_store(rec, data, 0);
821
822 }
823
824 static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
825 {
826         TDB_DATA data;
827
828         /*
829          * We have to store the header with empty data. TODO: Fix the
830          * tdb-level cleanup
831          */
832
833         ZERO_STRUCT(data);
834
835         return db_ctdb_store_persistent(rec, data, 0);
836
837 }
838
839 static int db_ctdb_record_destr(struct db_record* data)
840 {
841         struct db_ctdb_rec *crec = talloc_get_type_abort(
842                 data->private_data, struct db_ctdb_rec);
843
844         DEBUG(10, (DEBUGLEVEL > 10
845                    ? "Unlocking db %u key %s\n"
846                    : "Unlocking db %u key %.20s\n",
847                    (int)crec->ctdb_ctx->db_id,
848                    hex_encode(data, (unsigned char *)data->key.dptr,
849                               data->key.dsize)));
850
851         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
852                 DEBUG(0, ("tdb_chainunlock failed\n"));
853                 return -1;
854         }
855
856         return 0;
857 }
858
859 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
860                                                TALLOC_CTX *mem_ctx,
861                                                TDB_DATA key,
862                                                bool persistent)
863 {
864         struct db_record *result;
865         struct db_ctdb_rec *crec;
866         NTSTATUS status;
867         TDB_DATA ctdb_data;
868         int migrate_attempts = 0;
869
870         if (ctx->transaction != NULL) {
871                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
872         }
873
874         if (!(result = talloc(mem_ctx, struct db_record))) {
875                 DEBUG(0, ("talloc failed\n"));
876                 return NULL;
877         }
878
879         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
880                 DEBUG(0, ("talloc failed\n"));
881                 TALLOC_FREE(result);
882                 return NULL;
883         }
884
885         result->private_data = (void *)crec;
886         crec->ctdb_ctx = ctx;
887
888         result->key.dsize = key.dsize;
889         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
890         if (result->key.dptr == NULL) {
891                 DEBUG(0, ("talloc failed\n"));
892                 TALLOC_FREE(result);
893                 return NULL;
894         }
895
896         /*
897          * Do a blocking lock on the record
898          */
899 again:
900
901         if (DEBUGLEVEL >= 10) {
902                 char *keystr = hex_encode(result, key.dptr, key.dsize);
903                 DEBUG(10, (DEBUGLEVEL > 10
904                            ? "Locking db %u key %s\n"
905                            : "Locking db %u key %.20s\n",
906                            (int)crec->ctdb_ctx->db_id, keystr));
907                 TALLOC_FREE(keystr);
908         }
909         
910         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
911                 DEBUG(3, ("tdb_chainlock failed\n"));
912                 TALLOC_FREE(result);
913                 return NULL;
914         }
915
916         if (persistent) {
917                 result->store = db_ctdb_store_persistent;
918                 result->delete_rec = db_ctdb_delete_persistent;
919         } else {
920                 result->store = db_ctdb_store;
921                 result->delete_rec = db_ctdb_delete;
922         }
923         talloc_set_destructor(result, db_ctdb_record_destr);
924
925         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
926
927         /*
928          * See if we have a valid record and we are the dmaster. If so, we can
929          * take the shortcut and just return it.
930          */
931
932         if ((ctdb_data.dptr == NULL) ||
933             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
934             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
935 #if 0
936             || (random() % 2 != 0)
937 #endif
938 ) {
939                 SAFE_FREE(ctdb_data.dptr);
940                 tdb_chainunlock(ctx->wtdb->tdb, key);
941                 talloc_set_destructor(result, NULL);
942
943                 migrate_attempts += 1;
944
945                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
946                            ctdb_data.dptr, ctdb_data.dptr ?
947                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
948                            get_my_vnn()));
949
950                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
951                 if (!NT_STATUS_IS_OK(status)) {
952                         DEBUG(5, ("ctdb_migrate failed: %s\n",
953                                   nt_errstr(status)));
954                         TALLOC_FREE(result);
955                         return NULL;
956                 }
957                 /* now its migrated, try again */
958                 goto again;
959         }
960
961         if (migrate_attempts > 10) {
962                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
963                           migrate_attempts));
964         }
965
966         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
967
968         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
969         result->value.dptr = NULL;
970
971         if ((result->value.dsize != 0)
972             && !(result->value.dptr = (uint8 *)talloc_memdup(
973                          result, ctdb_data.dptr + sizeof(crec->header),
974                          result->value.dsize))) {
975                 DEBUG(0, ("talloc failed\n"));
976                 TALLOC_FREE(result);
977         }
978
979         SAFE_FREE(ctdb_data.dptr);
980
981         return result;
982 }
983
984 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
985                                               TALLOC_CTX *mem_ctx,
986                                               TDB_DATA key)
987 {
988         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
989                                                         struct db_ctdb_ctx);
990
991         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
992 }
993
994 /*
995   fetch (unlocked, no migration) operation on ctdb
996  */
997 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
998                          TDB_DATA key, TDB_DATA *data)
999 {
1000         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1001                                                         struct db_ctdb_ctx);
1002         NTSTATUS status;
1003         TDB_DATA ctdb_data;
1004
1005         if (ctx->transaction) {
1006                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1007         }
1008
1009         /* try a direct fetch */
1010         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1011
1012         /*
1013          * See if we have a valid record and we are the dmaster. If so, we can
1014          * take the shortcut and just return it.
1015          * we bypass the dmaster check for persistent databases
1016          */
1017         if ((ctdb_data.dptr != NULL) &&
1018             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1019             (db->persistent ||
1020              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1021                 /* we are the dmaster - avoid the ctdb protocol op */
1022
1023                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1024                 if (data->dsize == 0) {
1025                         SAFE_FREE(ctdb_data.dptr);
1026                         data->dptr = NULL;
1027                         return 0;
1028                 }
1029
1030                 data->dptr = (uint8 *)talloc_memdup(
1031                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1032                         data->dsize);
1033
1034                 SAFE_FREE(ctdb_data.dptr);
1035
1036                 if (data->dptr == NULL) {
1037                         return -1;
1038                 }
1039                 return 0;
1040         }
1041
1042         SAFE_FREE(ctdb_data.dptr);
1043
1044         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1045         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1046         if (!NT_STATUS_IS_OK(status)) {
1047                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1048                 return -1;
1049         }
1050
1051         return 0;
1052 }
1053
1054 struct traverse_state {
1055         struct db_context *db;
1056         int (*fn)(struct db_record *rec, void *private_data);
1057         void *private_data;
1058 };
1059
1060 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1061 {
1062         struct traverse_state *state = (struct traverse_state *)private_data;
1063         struct db_record *rec;
1064         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1065         /* we have to give them a locked record to prevent races */
1066         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1067         if (rec && rec->value.dsize > 0) {
1068                 state->fn(rec, state->private_data);
1069         }
1070         talloc_free(tmp_ctx);
1071 }
1072
1073 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1074                                         void *private_data)
1075 {
1076         struct traverse_state *state = (struct traverse_state *)private_data;
1077         struct db_record *rec;
1078         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1079         int ret = 0;
1080         /* we have to give them a locked record to prevent races */
1081         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1082         if (rec && rec->value.dsize > 0) {
1083                 ret = state->fn(rec, state->private_data);
1084         }
1085         talloc_free(tmp_ctx);
1086         return ret;
1087 }
1088
1089 static int db_ctdb_traverse(struct db_context *db,
1090                             int (*fn)(struct db_record *rec,
1091                                       void *private_data),
1092                             void *private_data)
1093 {
1094         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1095                                                         struct db_ctdb_ctx);
1096         struct traverse_state state;
1097
1098         state.db = db;
1099         state.fn = fn;
1100         state.private_data = private_data;
1101
1102         if (db->persistent) {
1103                 /* for persistent databases we don't need to do a ctdb traverse,
1104                    we can do a faster local traverse */
1105                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1106         }
1107
1108
1109         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1110         return 0;
1111 }
1112
1113 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1114 {
1115         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1116 }
1117
1118 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1119 {
1120         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1121 }
1122
1123 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1124 {
1125         struct traverse_state *state = (struct traverse_state *)private_data;
1126         struct db_record rec;
1127         rec.key = key;
1128         rec.value = data;
1129         rec.store = db_ctdb_store_deny;
1130         rec.delete_rec = db_ctdb_delete_deny;
1131         rec.private_data = state->db;
1132         state->fn(&rec, state->private_data);
1133 }
1134
1135 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1136                                         void *private_data)
1137 {
1138         struct traverse_state *state = (struct traverse_state *)private_data;
1139         struct db_record rec;
1140         rec.key = kbuf;
1141         rec.value = dbuf;
1142         rec.store = db_ctdb_store_deny;
1143         rec.delete_rec = db_ctdb_delete_deny;
1144         rec.private_data = state->db;
1145
1146         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1147                 /* a deleted record */
1148                 return 0;
1149         }
1150         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1151         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1152
1153         return state->fn(&rec, state->private_data);
1154 }
1155
1156 static int db_ctdb_traverse_read(struct db_context *db,
1157                                  int (*fn)(struct db_record *rec,
1158                                            void *private_data),
1159                                  void *private_data)
1160 {
1161         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1162                                                         struct db_ctdb_ctx);
1163         struct traverse_state state;
1164
1165         state.db = db;
1166         state.fn = fn;
1167         state.private_data = private_data;
1168
1169         if (db->persistent) {
1170                 /* for persistent databases we don't need to do a ctdb traverse,
1171                    we can do a faster local traverse */
1172                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1173         }
1174
1175         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1176         return 0;
1177 }
1178
1179 static int db_ctdb_get_seqnum(struct db_context *db)
1180 {
1181         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1182                                                         struct db_ctdb_ctx);
1183         return tdb_get_seqnum(ctx->wtdb->tdb);
1184 }
1185
1186 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1187                                 const char *name,
1188                                 int hash_size, int tdb_flags,
1189                                 int open_flags, mode_t mode)
1190 {
1191         struct db_context *result;
1192         struct db_ctdb_ctx *db_ctdb;
1193         char *db_path;
1194
1195         if (!lp_clustering()) {
1196                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1197                 return NULL;
1198         }
1199
1200         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1201                 DEBUG(0, ("talloc failed\n"));
1202                 TALLOC_FREE(result);
1203                 return NULL;
1204         }
1205
1206         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1207                 DEBUG(0, ("talloc failed\n"));
1208                 TALLOC_FREE(result);
1209                 return NULL;
1210         }
1211
1212         db_ctdb->transaction = NULL;
1213
1214         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1215                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1216                 TALLOC_FREE(result);
1217                 return NULL;
1218         }
1219
1220         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1221
1222         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1223
1224         /* only pass through specific flags */
1225         tdb_flags &= TDB_SEQNUM;
1226
1227         /* honor permissions if user has specified O_CREAT */
1228         if (open_flags & O_CREAT) {
1229                 chmod(db_path, mode);
1230         }
1231
1232         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1233         if (db_ctdb->wtdb == NULL) {
1234                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1235                 TALLOC_FREE(result);
1236                 return NULL;
1237         }
1238         talloc_free(db_path);
1239
1240         result->private_data = (void *)db_ctdb;
1241         result->fetch_locked = db_ctdb_fetch_locked;
1242         result->fetch = db_ctdb_fetch;
1243         result->traverse = db_ctdb_traverse;
1244         result->traverse_read = db_ctdb_traverse_read;
1245         result->get_seqnum = db_ctdb_get_seqnum;
1246         result->transaction_start = db_ctdb_transaction_start;
1247         result->transaction_commit = db_ctdb_transaction_commit;
1248         result->transaction_cancel = db_ctdb_transaction_cancel;
1249
1250         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1251                  name, db_ctdb->db_id));
1252
1253         return result;
1254 }
1255 #endif