we need to commit, not cancel, on record destruction
[ira/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34 };
35
36 struct db_ctdb_ctx {
37         struct db_context *db;
38         struct tdb_wrap *wtdb;
39         uint32 db_id;
40         struct db_ctdb_transaction_handle *transaction;
41 };
42
43 struct db_ctdb_rec {
44         struct db_ctdb_ctx *ctdb_ctx;
45         struct ctdb_ltdb_header header;
46 };
47
48 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
49                                                TALLOC_CTX *mem_ctx,
50                                                TDB_DATA key,
51                                                bool persistent);
52
53 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
54 {
55         NTSTATUS status;
56         enum TDB_ERROR tret = tdb_error(tdb);
57
58         switch (tret) {
59         case TDB_ERR_EXISTS:
60                 status = NT_STATUS_OBJECT_NAME_COLLISION;
61                 break;
62         case TDB_ERR_NOEXIST:
63                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
64                 break;
65         default:
66                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
67                 break;
68         }
69
70         return status;
71 }
72
73
74
75 /*
76   form a ctdb_rec_data record from a key/data pair
77   
78   note that header may be NULL. If not NULL then it is included in the data portion
79   of the record
80  */
81 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
82                                                   TDB_DATA key, 
83                                                   struct ctdb_ltdb_header *header,
84                                                   TDB_DATA data)
85 {
86         size_t length;
87         struct ctdb_rec_data *d;
88
89         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
90                 data.dsize + (header?sizeof(*header):0);
91         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
92         if (d == NULL) {
93                 return NULL;
94         }
95         d->length = length;
96         d->reqid = reqid;
97         d->keylen = key.dsize;
98         memcpy(&d->data[0], key.dptr, key.dsize);
99         if (header) {
100                 d->datalen = data.dsize + sizeof(*header);
101                 memcpy(&d->data[key.dsize], header, sizeof(*header));
102                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
103         } else {
104                 d->datalen = data.dsize;
105                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
106         }
107         return d;
108 }
109
110
111 /* helper function for marshalling multiple records */
112 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
113                                                struct ctdb_marshall_buffer *m,
114                                                uint64_t db_id,
115                                                uint32_t reqid,
116                                                TDB_DATA key,
117                                                struct ctdb_ltdb_header *header,
118                                                TDB_DATA data)
119 {
120         struct ctdb_rec_data *r;
121         size_t m_size, r_size;
122         struct ctdb_marshall_buffer *m2;
123
124         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
125         if (r == NULL) {
126                 talloc_free(m);
127                 return NULL;
128         }
129
130         if (m == NULL) {
131                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
132                 if (m == NULL) {
133                         return NULL;
134                 }
135                 m->db_id = db_id;
136         }
137
138         m_size = talloc_get_size(m);
139         r_size = talloc_get_size(r);
140
141         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
142         if (m2 == NULL) {
143                 talloc_free(m);
144                 return NULL;
145         }
146
147         memcpy(m_size + (uint8_t *)m2, r, r_size);
148
149         talloc_free(r);
150
151         m2->count++;
152
153         return m2;
154 }
155
156 /* we've finished marshalling, return a data blob with the marshalled records */
157 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
158 {
159         TDB_DATA data;
160         data.dptr = (uint8_t *)m;
161         data.dsize = talloc_get_size(m);
162         return data;
163 }
164
165 /* 
166    loop over a marshalling buffer 
167    
168      - pass r==NULL to start
169      - loop the number of times indicated by m->count
170 */
171 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
172                                                      uint32_t *reqid,
173                                                      struct ctdb_ltdb_header *header,
174                                                      TDB_DATA *key, TDB_DATA *data)
175 {
176         if (r == NULL) {
177                 r = (struct ctdb_rec_data *)&m->data[0];
178         } else {
179                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
180         }
181
182         if (reqid != NULL) {
183                 *reqid = r->reqid;
184         }
185         
186         if (key != NULL) {
187                 key->dptr   = &r->data[0];
188                 key->dsize  = r->keylen;
189         }
190         if (data != NULL) {
191                 data->dptr  = &r->data[r->keylen];
192                 data->dsize = r->datalen;
193                 if (header != NULL) {
194                         data->dptr += sizeof(*header);
195                         data->dsize -= sizeof(*header);
196                 }
197         }
198
199         if (header != NULL) {
200                 if (r->datalen < sizeof(*header)) {
201                         return NULL;
202                 }
203                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
204         }
205
206         return r;
207 }
208
209
210
211 /* start a transaction on a database */
212 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
213 {
214         tdb_transaction_cancel(h->ctx->wtdb->tdb);
215         return 0;
216 }
217
218 /* start a transaction on a database */
219 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
220 {
221         struct db_record *rh;
222         TDB_DATA key;
223         TALLOC_CTX *tmp_ctx;
224         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
225         int ret;
226         struct db_ctdb_ctx *ctx = h->ctx;
227         TDB_DATA data;
228
229         key.dptr = discard_const(keyname);
230         key.dsize = strlen(keyname);
231
232 again:
233         tmp_ctx = talloc_new(h);
234
235         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
236         if (rh == NULL) {
237                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
238                 talloc_free(tmp_ctx);
239                 return -1;
240         }
241         talloc_free(rh);
242
243         ret = tdb_transaction_start(ctx->wtdb->tdb);
244         if (ret != 0) {
245                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
246                 talloc_free(tmp_ctx);
247                 return -1;
248         }
249
250         data = tdb_fetch(ctx->wtdb->tdb, key);
251         if ((data.dptr == NULL) ||
252             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
253             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
254                 SAFE_FREE(data.dptr);
255                 tdb_transaction_cancel(ctx->wtdb->tdb);
256                 talloc_free(tmp_ctx);
257                 goto again;
258         }
259
260         SAFE_FREE(data.dptr);
261         talloc_free(tmp_ctx);
262
263         return 0;
264 }
265
266
267 /* start a transaction on a database */
268 static int db_ctdb_transaction_start(struct db_context *db)
269 {
270         struct db_ctdb_transaction_handle *h;
271         int ret;
272         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
273                                                         struct db_ctdb_ctx);
274
275         if (!db->persistent) {
276                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
277                          ctx->db_id));
278                 return -1;
279         }
280
281         if (ctx->transaction) {
282                 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
283                 return -1;
284         }
285
286         h = talloc_zero(db, struct db_ctdb_transaction_handle);
287         if (h == NULL) {
288                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
289                 return -1;
290         }
291
292         h->ctx = ctx;
293
294         ret = db_ctdb_transaction_fetch_start(h);
295         if (ret != 0) {
296                 talloc_free(h);
297                 return -1;
298         }
299
300         talloc_set_destructor(h, db_ctdb_transaction_destructor);
301
302         ctx->transaction = h;
303
304         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
305
306         return 0;
307 }
308
309
310
311 /*
312   fetch a record inside a transaction
313  */
314 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
315                                      TALLOC_CTX *mem_ctx, 
316                                      TDB_DATA key, TDB_DATA *data)
317 {
318         struct db_ctdb_transaction_handle *h = db->transaction;
319
320         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
321
322         if (data->dptr != NULL) {
323                 uint8_t *oldptr = (uint8_t *)data->dptr;
324                 data->dsize -= sizeof(struct ctdb_ltdb_header);
325                 if (data->dsize == 0) {
326                         data->dptr = NULL;
327                 } else {
328                         data->dptr = (uint8 *)
329                                 talloc_memdup(
330                                         mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
331                                         data->dsize);
332                 }
333                 SAFE_FREE(oldptr);
334                 if (data->dptr == NULL && data->dsize != 0) {
335                         return -1;
336                 }
337         }
338
339         if (!h->in_replay) {
340                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
341                 if (h->m_all == NULL) {
342                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
343                         data->dsize = 0;
344                         talloc_free(data->dptr);
345                         return -1;
346                 }
347         }
348
349         return 0;
350 }
351
352
353 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
354 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
355
356 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
357                                                           TALLOC_CTX *mem_ctx,
358                                                           TDB_DATA key)
359 {
360         struct db_record *result;
361         TDB_DATA ctdb_data;
362
363         if (!(result = talloc(mem_ctx, struct db_record))) {
364                 DEBUG(0, ("talloc failed\n"));
365                 return NULL;
366         }
367
368         result->private_data = ctx->transaction;
369
370         result->key.dsize = key.dsize;
371         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
372         if (result->key.dptr == NULL) {
373                 DEBUG(0, ("talloc failed\n"));
374                 TALLOC_FREE(result);
375                 return NULL;
376         }
377
378         result->store = db_ctdb_store_transaction;
379         result->delete_rec = db_ctdb_delete_transaction;
380
381         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
382         if (ctdb_data.dptr == NULL) {
383                 /* create the record */
384                 result->value = tdb_null;
385                 return result;
386         }
387
388         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
389         result->value.dptr = NULL;
390
391         if ((result->value.dsize != 0)
392             && !(result->value.dptr = (uint8 *)talloc_memdup(
393                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
394                          result->value.dsize))) {
395                 DEBUG(0, ("talloc failed\n"));
396                 TALLOC_FREE(result);
397         }
398
399         SAFE_FREE(ctdb_data.dptr);
400
401         return result;
402 }
403
404 static int db_ctdb_record_destructor(struct db_record *rec)
405 {
406         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
407                 rec->private_data, struct db_ctdb_transaction_handle);
408         int ret = h->ctx->db->transaction_commit(h->ctx->db);
409         if (ret != 0) {
410                 DEBUG(0,(__location__ " transaction_commit failed\n"));
411         }
412         return 0;
413 }
414
415 /*
416   auto-create a transaction for persistent databases
417  */
418 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
419                                                          TALLOC_CTX *mem_ctx,
420                                                          TDB_DATA key)
421 {
422         int res;
423         struct db_record *rec;
424
425         res = db_ctdb_transaction_start(ctx->db);
426         if (res == -1) {
427                 return NULL;
428         }
429
430         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
431         if (rec == NULL) {
432                 ctx->db->transaction_cancel(ctx->db);           
433                 return NULL;
434         }
435
436         /* destroy this transaction when we release the lock */
437         talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
438         return rec;
439 }
440
441
442 /*
443   stores a record inside a transaction
444  */
445 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
446                                      TDB_DATA key, TDB_DATA data)
447 {
448         TALLOC_CTX *tmp_ctx = talloc_new(h);
449         int ret;
450         TDB_DATA rec;
451         struct ctdb_ltdb_header header;
452
453         /* we need the header so we can update the RSN */
454         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
455         if (rec.dptr == NULL) {
456                 /* the record doesn't exist - create one with us as dmaster.
457                    This is only safe because we are in a transaction and this
458                    is a persistent database */
459                 ZERO_STRUCT(header);
460                 header.dmaster = get_my_vnn();
461         } else {
462                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
463                 SAFE_FREE(rec.dptr);
464         }
465
466         header.rsn++;
467
468         if (!h->in_replay) {
469                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
470                 if (h->m_all == NULL) {
471                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
472                         talloc_free(tmp_ctx);
473                         return -1;
474                 }
475                 
476                 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
477                 if (h->m_write == NULL) {
478                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
479                         talloc_free(tmp_ctx);
480                         return -1;
481                 }
482         }
483         
484         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
485         rec.dptr = talloc_size(tmp_ctx, rec.dsize);
486         if (rec.dptr == NULL) {
487                 DEBUG(0,(__location__ " Failed to alloc record\n"));
488                 talloc_free(tmp_ctx);
489                 return -1;
490         }
491         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
492         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
493
494         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
495
496         talloc_free(tmp_ctx);
497         
498         return ret;
499 }
500
501
502 /* 
503    a record store inside a transaction
504  */
505 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
506 {
507         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
508                 rec->private_data, struct db_ctdb_transaction_handle);
509         int ret;
510
511         ret = db_ctdb_transaction_store(h, rec->key, data);
512         if (ret != 0) {
513                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
514         }
515         return NT_STATUS_OK;
516 }
517
518 /* 
519    a record delete inside a transaction
520  */
521 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
522 {
523         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
524                 rec->private_data, struct db_ctdb_transaction_handle);
525         int ret;
526
527         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
528         if (ret != 0) {
529                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
530         }
531         return NT_STATUS_OK;
532 }
533
534
535 /*
536   replay a transaction
537  */
538 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
539 {
540         int ret, i;
541         struct ctdb_rec_data *rec = NULL;
542
543         h->in_replay = true;
544
545         ret = db_ctdb_transaction_fetch_start(h);
546         if (ret != 0) {
547                 return ret;
548         }
549
550         for (i=0;i<h->m_all->count;i++) {
551                 TDB_DATA key, data;
552
553                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
554                 if (rec == NULL) {
555                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
556                         goto failed;
557                 }
558
559                 if (rec->reqid == 0) {
560                         /* its a store */
561                         if (db_ctdb_transaction_store(h, key, data) != 0) {
562                                 goto failed;
563                         }
564                 } else {
565                         TDB_DATA data2;
566                         TALLOC_CTX *tmp_ctx = talloc_new(h);
567
568                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
569                                 talloc_free(tmp_ctx);
570                                 goto failed;
571                         }
572                         if (data2.dsize != data.dsize ||
573                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
574                                 /* the record has changed on us - we have to give up */
575                                 talloc_free(tmp_ctx);
576                                 goto failed;
577                         }
578                         talloc_free(tmp_ctx);
579                 }
580         }
581         
582         return 0;
583
584 failed:
585         tdb_transaction_cancel(h->ctx->wtdb->tdb);
586         return -1;
587 }
588
589
590 /*
591   commit a transaction
592  */
593 static int db_ctdb_transaction_commit(struct db_context *db)
594 {
595         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
596                                                         struct db_ctdb_ctx);
597         NTSTATUS rets;
598         int ret;
599         int status;
600         struct db_ctdb_transaction_handle *h = ctx->transaction;
601
602         if (h == NULL) {
603                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
604                 return -1;
605         }
606
607         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
608
609         if (h->m_write == NULL) {
610                 /* no changes were made */
611                 talloc_free(h);
612                 ctx->transaction = NULL;
613                 return 0;
614         }
615
616         talloc_set_destructor(h, NULL);
617
618         /* our commit strategy is quite complex.
619
620            - we first try to commit the changes to all other nodes
621
622            - if that works, then we commit locally and we are done
623
624            - if a commit on another node fails, then we need to cancel
625              the transaction, then restart the transaction (thus
626              opening a window of time for a pending recovery to
627              complete), then replay the transaction, checking all the
628              reads and writes (checking that reads give the same data,
629              and writes succeed). Then we retry the transaction to the
630              other nodes
631         */
632
633 again:
634         /* tell ctdbd to commit to the other nodes */
635         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
636                                   CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
637                                   db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
638         if (!NT_STATUS_IS_OK(rets) || status != 0) {
639                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
640                 sleep(1);
641                 if (ctdb_replay_transaction(h) != 0) {
642                         DEBUG(0,(__location__ " Failed to replay transaction\n"));
643                         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
644                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
645                                             tdb_null, NULL, NULL, NULL);
646                         h->ctx->transaction = NULL;
647                         talloc_free(h);
648                         ctx->transaction = NULL;
649                         return -1;
650                 }
651                 goto again;
652         }
653
654         /* do the real commit locally */
655         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
656         if (ret != 0) {
657                 DEBUG(0,(__location__ " Failed to commit transaction\n"));
658                 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id, 
659                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
660                 h->ctx->transaction = NULL;
661                 talloc_free(h);
662                 return ret;
663         }
664
665         /* tell ctdbd that we are finished with our local commit */
666         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
667                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
668                             tdb_null, NULL, NULL, NULL);
669         h->ctx->transaction = NULL;
670         talloc_free(h);
671         return 0;
672 }
673
674
675 /*
676   cancel a transaction
677  */
678 static int db_ctdb_transaction_cancel(struct db_context *db)
679 {
680         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
681                                                         struct db_ctdb_ctx);
682         struct db_ctdb_transaction_handle *h = ctx->transaction;
683
684         if (h == NULL) {
685                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
686                 return -1;
687         }
688
689         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
690
691         ctx->transaction = NULL;
692         talloc_free(h);
693         return 0;
694 }
695
696
697 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
698 {
699         struct db_ctdb_rec *crec = talloc_get_type_abort(
700                 rec->private_data, struct db_ctdb_rec);
701         TDB_DATA cdata;
702         int ret;
703
704         cdata.dsize = sizeof(crec->header) + data.dsize;
705
706         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
707                 return NT_STATUS_NO_MEMORY;
708         }
709
710         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
711         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
712
713         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
714
715         SAFE_FREE(cdata.dptr);
716
717         return (ret == 0) ? NT_STATUS_OK
718                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
719 }
720
721
722
723 static NTSTATUS db_ctdb_delete(struct db_record *rec)
724 {
725         TDB_DATA data;
726
727         /*
728          * We have to store the header with empty data. TODO: Fix the
729          * tdb-level cleanup
730          */
731
732         ZERO_STRUCT(data);
733
734         return db_ctdb_store(rec, data, 0);
735
736 }
737
738 static int db_ctdb_record_destr(struct db_record* data)
739 {
740         struct db_ctdb_rec *crec = talloc_get_type_abort(
741                 data->private_data, struct db_ctdb_rec);
742
743         DEBUG(10, (DEBUGLEVEL > 10
744                    ? "Unlocking db %u key %s\n"
745                    : "Unlocking db %u key %.20s\n",
746                    (int)crec->ctdb_ctx->db_id,
747                    hex_encode(data, (unsigned char *)data->key.dptr,
748                               data->key.dsize)));
749
750         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
751                 DEBUG(0, ("tdb_chainunlock failed\n"));
752                 return -1;
753         }
754
755         return 0;
756 }
757
758 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
759                                                TALLOC_CTX *mem_ctx,
760                                                TDB_DATA key,
761                                                bool persistent)
762 {
763         struct db_record *result;
764         struct db_ctdb_rec *crec;
765         NTSTATUS status;
766         TDB_DATA ctdb_data;
767         int migrate_attempts = 0;
768
769         if (!(result = talloc(mem_ctx, struct db_record))) {
770                 DEBUG(0, ("talloc failed\n"));
771                 return NULL;
772         }
773
774         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
775                 DEBUG(0, ("talloc failed\n"));
776                 TALLOC_FREE(result);
777                 return NULL;
778         }
779
780         result->private_data = (void *)crec;
781         crec->ctdb_ctx = ctx;
782
783         result->key.dsize = key.dsize;
784         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
785         if (result->key.dptr == NULL) {
786                 DEBUG(0, ("talloc failed\n"));
787                 TALLOC_FREE(result);
788                 return NULL;
789         }
790
791         /*
792          * Do a blocking lock on the record
793          */
794 again:
795
796         if (DEBUGLEVEL >= 10) {
797                 char *keystr = hex_encode(result, key.dptr, key.dsize);
798                 DEBUG(10, (DEBUGLEVEL > 10
799                            ? "Locking db %u key %s\n"
800                            : "Locking db %u key %.20s\n",
801                            (int)crec->ctdb_ctx->db_id, keystr));
802                 TALLOC_FREE(keystr);
803         }
804         
805         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
806                 DEBUG(3, ("tdb_chainlock failed\n"));
807                 TALLOC_FREE(result);
808                 return NULL;
809         }
810
811         result->store = db_ctdb_store;
812         result->delete_rec = db_ctdb_delete;
813         talloc_set_destructor(result, db_ctdb_record_destr);
814
815         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
816
817         /*
818          * See if we have a valid record and we are the dmaster. If so, we can
819          * take the shortcut and just return it.
820          */
821
822         if ((ctdb_data.dptr == NULL) ||
823             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
824             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
825 #if 0
826             || (random() % 2 != 0)
827 #endif
828 ) {
829                 SAFE_FREE(ctdb_data.dptr);
830                 tdb_chainunlock(ctx->wtdb->tdb, key);
831                 talloc_set_destructor(result, NULL);
832
833                 migrate_attempts += 1;
834
835                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
836                            ctdb_data.dptr, ctdb_data.dptr ?
837                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
838                            get_my_vnn()));
839
840                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
841                 if (!NT_STATUS_IS_OK(status)) {
842                         DEBUG(5, ("ctdb_migrate failed: %s\n",
843                                   nt_errstr(status)));
844                         TALLOC_FREE(result);
845                         return NULL;
846                 }
847                 /* now its migrated, try again */
848                 goto again;
849         }
850
851         if (migrate_attempts > 10) {
852                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
853                           migrate_attempts));
854         }
855
856         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
857
858         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
859         result->value.dptr = NULL;
860
861         if ((result->value.dsize != 0)
862             && !(result->value.dptr = (uint8 *)talloc_memdup(
863                          result, ctdb_data.dptr + sizeof(crec->header),
864                          result->value.dsize))) {
865                 DEBUG(0, ("talloc failed\n"));
866                 TALLOC_FREE(result);
867         }
868
869         SAFE_FREE(ctdb_data.dptr);
870
871         return result;
872 }
873
874 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
875                                               TALLOC_CTX *mem_ctx,
876                                               TDB_DATA key)
877 {
878         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
879                                                         struct db_ctdb_ctx);
880
881         if (ctx->transaction != NULL) {
882                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
883         }
884
885         if (db->persistent) {
886                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
887         }
888
889         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
890 }
891
892 /*
893   fetch (unlocked, no migration) operation on ctdb
894  */
895 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
896                          TDB_DATA key, TDB_DATA *data)
897 {
898         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
899                                                         struct db_ctdb_ctx);
900         NTSTATUS status;
901         TDB_DATA ctdb_data;
902
903         if (ctx->transaction) {
904                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
905         }
906
907         /* try a direct fetch */
908         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
909
910         /*
911          * See if we have a valid record and we are the dmaster. If so, we can
912          * take the shortcut and just return it.
913          * we bypass the dmaster check for persistent databases
914          */
915         if ((ctdb_data.dptr != NULL) &&
916             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
917             (db->persistent ||
918              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
919                 /* we are the dmaster - avoid the ctdb protocol op */
920
921                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
922                 if (data->dsize == 0) {
923                         SAFE_FREE(ctdb_data.dptr);
924                         data->dptr = NULL;
925                         return 0;
926                 }
927
928                 data->dptr = (uint8 *)talloc_memdup(
929                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
930                         data->dsize);
931
932                 SAFE_FREE(ctdb_data.dptr);
933
934                 if (data->dptr == NULL) {
935                         return -1;
936                 }
937                 return 0;
938         }
939
940         SAFE_FREE(ctdb_data.dptr);
941
942         /* we weren't able to get it locally - ask ctdb to fetch it for us */
943         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
944         if (!NT_STATUS_IS_OK(status)) {
945                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
946                 return -1;
947         }
948
949         return 0;
950 }
951
952 struct traverse_state {
953         struct db_context *db;
954         int (*fn)(struct db_record *rec, void *private_data);
955         void *private_data;
956 };
957
958 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
959 {
960         struct traverse_state *state = (struct traverse_state *)private_data;
961         struct db_record *rec;
962         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
963         /* we have to give them a locked record to prevent races */
964         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
965         if (rec && rec->value.dsize > 0) {
966                 state->fn(rec, state->private_data);
967         }
968         talloc_free(tmp_ctx);
969 }
970
971 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
972                                         void *private_data)
973 {
974         struct traverse_state *state = (struct traverse_state *)private_data;
975         struct db_record *rec;
976         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
977         int ret = 0;
978         /* we have to give them a locked record to prevent races */
979         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
980         if (rec && rec->value.dsize > 0) {
981                 ret = state->fn(rec, state->private_data);
982         }
983         talloc_free(tmp_ctx);
984         return ret;
985 }
986
987 static int db_ctdb_traverse(struct db_context *db,
988                             int (*fn)(struct db_record *rec,
989                                       void *private_data),
990                             void *private_data)
991 {
992         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
993                                                         struct db_ctdb_ctx);
994         struct traverse_state state;
995
996         state.db = db;
997         state.fn = fn;
998         state.private_data = private_data;
999
1000         if (db->persistent) {
1001                 /* for persistent databases we don't need to do a ctdb traverse,
1002                    we can do a faster local traverse */
1003                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1004         }
1005
1006
1007         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1008         return 0;
1009 }
1010
1011 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1012 {
1013         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1014 }
1015
1016 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1017 {
1018         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1019 }
1020
1021 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1022 {
1023         struct traverse_state *state = (struct traverse_state *)private_data;
1024         struct db_record rec;
1025         rec.key = key;
1026         rec.value = data;
1027         rec.store = db_ctdb_store_deny;
1028         rec.delete_rec = db_ctdb_delete_deny;
1029         rec.private_data = state->db;
1030         state->fn(&rec, state->private_data);
1031 }
1032
1033 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1034                                         void *private_data)
1035 {
1036         struct traverse_state *state = (struct traverse_state *)private_data;
1037         struct db_record rec;
1038         rec.key = kbuf;
1039         rec.value = dbuf;
1040         rec.store = db_ctdb_store_deny;
1041         rec.delete_rec = db_ctdb_delete_deny;
1042         rec.private_data = state->db;
1043
1044         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1045                 /* a deleted record */
1046                 return 0;
1047         }
1048         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1049         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1050
1051         return state->fn(&rec, state->private_data);
1052 }
1053
1054 static int db_ctdb_traverse_read(struct db_context *db,
1055                                  int (*fn)(struct db_record *rec,
1056                                            void *private_data),
1057                                  void *private_data)
1058 {
1059         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1060                                                         struct db_ctdb_ctx);
1061         struct traverse_state state;
1062
1063         state.db = db;
1064         state.fn = fn;
1065         state.private_data = private_data;
1066
1067         if (db->persistent) {
1068                 /* for persistent databases we don't need to do a ctdb traverse,
1069                    we can do a faster local traverse */
1070                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1071         }
1072
1073         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1074         return 0;
1075 }
1076
1077 static int db_ctdb_get_seqnum(struct db_context *db)
1078 {
1079         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1080                                                         struct db_ctdb_ctx);
1081         return tdb_get_seqnum(ctx->wtdb->tdb);
1082 }
1083
1084 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1085                                 const char *name,
1086                                 int hash_size, int tdb_flags,
1087                                 int open_flags, mode_t mode)
1088 {
1089         struct db_context *result;
1090         struct db_ctdb_ctx *db_ctdb;
1091         char *db_path;
1092
1093         if (!lp_clustering()) {
1094                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1095                 return NULL;
1096         }
1097
1098         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1099                 DEBUG(0, ("talloc failed\n"));
1100                 TALLOC_FREE(result);
1101                 return NULL;
1102         }
1103
1104         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1105                 DEBUG(0, ("talloc failed\n"));
1106                 TALLOC_FREE(result);
1107                 return NULL;
1108         }
1109
1110         db_ctdb->transaction = NULL;
1111         db_ctdb->db = result;
1112
1113         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1114                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1115                 TALLOC_FREE(result);
1116                 return NULL;
1117         }
1118
1119         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1120
1121         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1122
1123         /* only pass through specific flags */
1124         tdb_flags &= TDB_SEQNUM;
1125
1126         /* honor permissions if user has specified O_CREAT */
1127         if (open_flags & O_CREAT) {
1128                 chmod(db_path, mode);
1129         }
1130
1131         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1132         if (db_ctdb->wtdb == NULL) {
1133                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1134                 TALLOC_FREE(result);
1135                 return NULL;
1136         }
1137         talloc_free(db_path);
1138
1139         result->private_data = (void *)db_ctdb;
1140         result->fetch_locked = db_ctdb_fetch_locked;
1141         result->fetch = db_ctdb_fetch;
1142         result->traverse = db_ctdb_traverse;
1143         result->traverse_read = db_ctdb_traverse_read;
1144         result->get_seqnum = db_ctdb_get_seqnum;
1145         result->transaction_start = db_ctdb_transaction_start;
1146         result->transaction_commit = db_ctdb_transaction_commit;
1147         result->transaction_cancel = db_ctdb_transaction_cancel;
1148
1149         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1150                  name, db_ctdb->db_id));
1151
1152         return result;
1153 }
1154 #endif