cleanup debugging and fix handling of empty transaction
[ira/wip.git] / source3 / lib / dbwrap_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007
5    
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
25
26 struct db_ctdb_transaction_handle {
27         struct db_ctdb_ctx *ctx;
28         bool in_replay;
29         /* we store the reads and writes done under a transaction one
30            list stores both reads and writes, the other just writes
31         */
32         struct ctdb_marshall_buffer *m_all;
33         struct ctdb_marshall_buffer *m_write;
34 };
35
36 struct db_ctdb_ctx {
37         struct tdb_wrap *wtdb;
38         uint32 db_id;
39         struct db_ctdb_transaction_handle *transaction;
40 };
41
42 struct db_ctdb_rec {
43         struct db_ctdb_ctx *ctdb_ctx;
44         struct ctdb_ltdb_header header;
45 };
46
47 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
48                                                TALLOC_CTX *mem_ctx,
49                                                TDB_DATA key,
50                                                bool persistent);
51
52 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
53 {
54         NTSTATUS status;
55         enum TDB_ERROR tret = tdb_error(tdb);
56
57         switch (tret) {
58         case TDB_ERR_EXISTS:
59                 status = NT_STATUS_OBJECT_NAME_COLLISION;
60                 break;
61         case TDB_ERR_NOEXIST:
62                 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
63                 break;
64         default:
65                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
66                 break;
67         }
68
69         return status;
70 }
71
72
73
74 /*
75   form a ctdb_rec_data record from a key/data pair
76   
77   note that header may be NULL. If not NULL then it is included in the data portion
78   of the record
79  */
80 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,       
81                                                   TDB_DATA key, 
82                                                   struct ctdb_ltdb_header *header,
83                                                   TDB_DATA data)
84 {
85         size_t length;
86         struct ctdb_rec_data *d;
87
88         length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
89                 data.dsize + (header?sizeof(*header):0);
90         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
91         if (d == NULL) {
92                 return NULL;
93         }
94         d->length = length;
95         d->reqid = reqid;
96         d->keylen = key.dsize;
97         memcpy(&d->data[0], key.dptr, key.dsize);
98         if (header) {
99                 d->datalen = data.dsize + sizeof(*header);
100                 memcpy(&d->data[key.dsize], header, sizeof(*header));
101                 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
102         } else {
103                 d->datalen = data.dsize;
104                 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
105         }
106         return d;
107 }
108
109
110 /* helper function for marshalling multiple records */
111 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx, 
112                                                struct ctdb_marshall_buffer *m,
113                                                uint64_t db_id,
114                                                uint32_t reqid,
115                                                TDB_DATA key,
116                                                struct ctdb_ltdb_header *header,
117                                                TDB_DATA data)
118 {
119         struct ctdb_rec_data *r;
120         size_t m_size, r_size;
121         struct ctdb_marshall_buffer *m2;
122
123         r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
124         if (r == NULL) {
125                 talloc_free(m);
126                 return NULL;
127         }
128
129         if (m == NULL) {
130                 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
131                 if (m == NULL) {
132                         return NULL;
133                 }
134                 m->db_id = db_id;
135         }
136
137         m_size = talloc_get_size(m);
138         r_size = talloc_get_size(r);
139
140         m2 = talloc_realloc_size(mem_ctx, m,  m_size + r_size);
141         if (m2 == NULL) {
142                 talloc_free(m);
143                 return NULL;
144         }
145
146         memcpy(m_size + (uint8_t *)m2, r, r_size);
147
148         talloc_free(r);
149
150         m2->count++;
151
152         return m2;
153 }
154
155 /* we've finished marshalling, return a data blob with the marshalled records */
156 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
157 {
158         TDB_DATA data;
159         data.dptr = (uint8_t *)m;
160         data.dsize = talloc_get_size(m);
161         return data;
162 }
163
164 /* 
165    loop over a marshalling buffer 
166    
167      - pass r==NULL to start
168      - loop the number of times indicated by m->count
169 */
170 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
171                                                      uint32_t *reqid,
172                                                      struct ctdb_ltdb_header *header,
173                                                      TDB_DATA *key, TDB_DATA *data)
174 {
175         if (r == NULL) {
176                 r = (struct ctdb_rec_data *)&m->data[0];
177         } else {
178                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
179         }
180
181         if (reqid != NULL) {
182                 *reqid = r->reqid;
183         }
184         
185         if (key != NULL) {
186                 key->dptr   = &r->data[0];
187                 key->dsize  = r->keylen;
188         }
189         if (data != NULL) {
190                 data->dptr  = &r->data[r->keylen];
191                 data->dsize = r->datalen;
192                 if (header != NULL) {
193                         data->dptr += sizeof(*header);
194                         data->dsize -= sizeof(*header);
195                 }
196         }
197
198         if (header != NULL) {
199                 if (r->datalen < sizeof(*header)) {
200                         return NULL;
201                 }
202                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
203         }
204
205         return r;
206 }
207
208
209
210 /* start a transaction on a database */
211 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
212 {
213         tdb_transaction_cancel(h->ctx->wtdb->tdb);
214         return 0;
215 }
216
217 /* start a transaction on a database */
218 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
219 {
220         struct db_record *rh;
221         TDB_DATA key;
222         TALLOC_CTX *tmp_ctx;
223         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
224         int ret;
225         struct db_ctdb_ctx *ctx = h->ctx;
226         TDB_DATA data;
227
228         key.dptr = discard_const(keyname);
229         key.dsize = strlen(keyname);
230
231 again:
232         tmp_ctx = talloc_new(h);
233
234         rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
235         if (rh == NULL) {
236                 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
237                 talloc_free(tmp_ctx);
238                 return -1;
239         }
240         talloc_free(rh);
241
242         ret = tdb_transaction_start(ctx->wtdb->tdb);
243         if (ret != 0) {
244                 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
245                 talloc_free(tmp_ctx);
246                 return -1;
247         }
248
249         data = tdb_fetch(ctx->wtdb->tdb, key);
250         if ((data.dptr == NULL) ||
251             (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
252             ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
253                 SAFE_FREE(data.dptr);
254                 tdb_transaction_cancel(ctx->wtdb->tdb);
255                 talloc_free(tmp_ctx);
256                 goto again;
257         }
258
259         SAFE_FREE(data.dptr);
260         talloc_free(tmp_ctx);
261
262         return 0;
263 }
264
265
266 /* start a transaction on a database */
267 static int db_ctdb_transaction_start(struct db_context *db)
268 {
269         struct db_ctdb_transaction_handle *h;
270         int ret;
271         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
272                                                         struct db_ctdb_ctx);
273
274         if (!db->persistent) {
275                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
276                          ctx->db_id));
277                 return -1;
278         }
279
280         if (ctx->transaction) {
281                 DEBUG(0,("Nested transactions not supported on db 0x%08x\n", ctx->db_id));
282                 return -1;
283         }
284
285         h = talloc_zero(db, struct db_ctdb_transaction_handle);
286         if (h == NULL) {
287                 DEBUG(0,(__location__ " oom for transaction handle\n"));                
288                 return -1;
289         }
290
291         h->ctx = ctx;
292
293         ret = db_ctdb_transaction_fetch_start(h);
294         if (ret != 0) {
295                 talloc_free(h);
296                 return -1;
297         }
298
299         talloc_set_destructor(h, db_ctdb_transaction_destructor);
300
301         ctx->transaction = h;
302
303         DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
304
305         return 0;
306 }
307
308
309
310 /*
311   fetch a record inside a transaction
312  */
313 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db, 
314                                      TALLOC_CTX *mem_ctx, 
315                                      TDB_DATA key, TDB_DATA *data)
316 {
317         struct db_ctdb_transaction_handle *h = db->transaction;
318
319         *data = tdb_fetch(h->ctx->wtdb->tdb, key);
320
321         if (data->dptr != NULL) {
322                 uint8_t *oldptr = (uint8_t *)data->dptr;
323                 data->dsize -= sizeof(struct ctdb_ltdb_header);
324                 data->dptr = (uint8 *)
325                         talloc_memdup(
326                                 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
327                                 data->dsize);
328                 SAFE_FREE(oldptr);
329                 if (data->dptr == NULL) {
330                         return -1;
331                 }
332         }
333
334         if (!h->in_replay) {
335                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
336                 if (h->m_all == NULL) {
337                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
338                         data->dsize = 0;
339                         talloc_free(data->dptr);
340                         return -1;
341                 }
342         }
343
344         return 0;
345 }
346
347
348 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
349 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
350
351 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
352                                                           TALLOC_CTX *mem_ctx,
353                                                           TDB_DATA key)
354 {
355         struct db_record *result;
356         TDB_DATA ctdb_data;
357
358         if (!(result = talloc(mem_ctx, struct db_record))) {
359                 DEBUG(0, ("talloc failed\n"));
360                 return NULL;
361         }
362
363         result->private_data = ctx->transaction;
364
365         result->key.dsize = key.dsize;
366         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
367         if (result->key.dptr == NULL) {
368                 DEBUG(0, ("talloc failed\n"));
369                 TALLOC_FREE(result);
370                 return NULL;
371         }
372
373         result->store = db_ctdb_store_transaction;
374         result->delete_rec = db_ctdb_delete_transaction;
375
376         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
377         if (ctdb_data.dptr == NULL) {
378                 /* create the record */
379                 result->value = tdb_null;
380                 return result;
381         }
382
383         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
384         result->value.dptr = NULL;
385
386         if ((result->value.dsize != 0)
387             && !(result->value.dptr = (uint8 *)talloc_memdup(
388                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
389                          result->value.dsize))) {
390                 DEBUG(0, ("talloc failed\n"));
391                 TALLOC_FREE(result);
392         }
393
394         SAFE_FREE(ctdb_data.dptr);
395
396         return result;
397 }
398
399
400 /*
401   stores a record inside a transaction
402  */
403 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h, 
404                                      TDB_DATA key, TDB_DATA data)
405 {
406         TALLOC_CTX *tmp_ctx = talloc_new(h);
407         int ret;
408         TDB_DATA rec;
409         struct ctdb_ltdb_header header;
410
411         /* we need the header so we can update the RSN */
412         rec = tdb_fetch(h->ctx->wtdb->tdb, key);
413         if (rec.dptr == NULL) {
414                 /* the record doesn't exist - create one with us as dmaster.
415                    This is only safe because we are in a transaction and this
416                    is a persistent database */
417                 ZERO_STRUCT(header);
418                 header.dmaster = get_my_vnn();
419         } else {
420                 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
421                 SAFE_FREE(rec.dptr);
422         }
423
424         header.rsn++;
425
426         if (!h->in_replay) {
427                 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
428                 if (h->m_all == NULL) {
429                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
430                         talloc_free(tmp_ctx);
431                         return -1;
432                 }
433                 
434                 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
435                 if (h->m_write == NULL) {
436                         DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
437                         talloc_free(tmp_ctx);
438                         return -1;
439                 }
440         }
441         
442         rec.dptr = talloc_size(tmp_ctx, data.dsize + sizeof(struct ctdb_ltdb_header));
443         if (rec.dptr == NULL) {
444                 DEBUG(0,(__location__ " Failed to alloc record\n"));
445                 talloc_free(tmp_ctx);
446                 return -1;
447         }
448         memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
449         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
450
451         ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
452
453         talloc_free(tmp_ctx);
454         
455         return ret;
456 }
457
458
459 /* 
460    a record store inside a transaction
461  */
462 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
463 {
464         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
465                 rec->private_data, struct db_ctdb_transaction_handle);
466         int ret;
467
468         ret = db_ctdb_transaction_store(h, rec->key, data);
469         if (ret != 0) {
470                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
471         }
472         return NT_STATUS_OK;
473 }
474
475 /* 
476    a record delete inside a transaction
477  */
478 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
479 {
480         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
481                 rec->private_data, struct db_ctdb_transaction_handle);
482         int ret;
483
484         ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
485         if (ret != 0) {
486                 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
487         }
488         return NT_STATUS_OK;
489 }
490
491
492 /*
493   replay a transaction
494  */
495 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
496 {
497         int ret, i;
498         struct ctdb_rec_data *rec = NULL;
499
500         h->in_replay = true;
501
502         ret = db_ctdb_transaction_fetch_start(h);
503         if (ret != 0) {
504                 return ret;
505         }
506
507         for (i=0;i<h->m_all->count;i++) {
508                 TDB_DATA key, data;
509
510                 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
511                 if (rec == NULL) {
512                         DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
513                         goto failed;
514                 }
515
516                 if (rec->reqid == 0) {
517                         /* its a store */
518                         if (db_ctdb_transaction_store(h, key, data) != 0) {
519                                 goto failed;
520                         }
521                 } else {
522                         TDB_DATA data2;
523                         TALLOC_CTX *tmp_ctx = talloc_new(h);
524
525                         if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
526                                 talloc_free(tmp_ctx);
527                                 goto failed;
528                         }
529                         if (data2.dsize != data.dsize ||
530                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
531                                 /* the record has changed on us - we have to give up */
532                                 talloc_free(tmp_ctx);
533                                 goto failed;
534                         }
535                         talloc_free(tmp_ctx);
536                 }
537         }
538         
539         return 0;
540
541 failed:
542         tdb_transaction_cancel(h->ctx->wtdb->tdb);
543         return -1;
544 }
545
546
547 /*
548   commit a transaction
549  */
550 static int db_ctdb_transaction_commit(struct db_context *db)
551 {
552         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
553                                                         struct db_ctdb_ctx);
554         NTSTATUS rets;
555         int ret;
556         int status;
557         struct db_ctdb_transaction_handle *h = ctx->transaction;
558
559         if (h == NULL) {
560                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
561                 return -1;
562         }
563
564         DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
565
566         if (h->m_write == NULL) {
567                 /* no changes were made */
568                 talloc_free(h);
569                 ctx->transaction = NULL;
570                 return 0;
571         }
572
573         talloc_set_destructor(h, NULL);
574
575         /* our commit strategy is quite complex.
576
577            - we first try to commit the changes to all other nodes
578
579            - if that works, then we commit locally and we are done
580
581            - if a commit on another node fails, then we need to cancel
582              the transaction, then restart the transaction (thus
583              opening a window of time for a pending recovery to
584              complete), then replay the transaction, checking all the
585              reads and writes (checking that reads give the same data,
586              and writes succeed). Then we retry the transaction to the
587              other nodes
588         */
589
590 again:
591         /* tell ctdbd to commit to the other nodes */
592         rets = ctdbd_control_local(messaging_ctdbd_connection(), 
593                                   CTDB_CONTROL_TRANS2_COMMIT, h->ctx->db_id, 0,
594                                   db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
595         if (!NT_STATUS_IS_OK(rets) || status != 0) {
596                 tdb_transaction_cancel(h->ctx->wtdb->tdb);
597                 sleep(1);
598                 if (ctdb_replay_transaction(h) != 0) {
599                         DEBUG(0,(__location__ " Failed to replay transaction\n"));
600                         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR,
601                                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
602                                             tdb_null, NULL, NULL, NULL);
603                         h->ctx->transaction = NULL;
604                         talloc_free(h);
605                         ctx->transaction = NULL;
606                         return -1;
607                 }
608                 goto again;
609         }
610
611         /* do the real commit locally */
612         ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
613         if (ret != 0) {
614                 DEBUG(0,(__location__ " Failed to commit transaction\n"));
615                 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_ERROR, h->ctx->db_id, 
616                                     CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
617                 h->ctx->transaction = NULL;
618                 talloc_free(h);
619                 return ret;
620         }
621
622         /* tell ctdbd that we are finished with our local commit */
623         ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
624                             h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
625                             tdb_null, NULL, NULL, NULL);
626         h->ctx->transaction = NULL;
627         talloc_free(h);
628         return 0;
629 }
630
631
632 /*
633   cancel a transaction
634  */
635 static int db_ctdb_transaction_cancel(struct db_context *db)
636 {
637         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
638                                                         struct db_ctdb_ctx);
639         struct db_ctdb_transaction_handle *h = ctx->transaction;
640
641         if (h == NULL) {
642                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
643                 return -1;
644         }
645
646         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
647
648         ctx->transaction = NULL;
649         talloc_free(h);
650         return 0;
651 }
652
653
654 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
655 {
656         struct db_ctdb_rec *crec = talloc_get_type_abort(
657                 rec->private_data, struct db_ctdb_rec);
658         TDB_DATA cdata;
659         int ret;
660
661         cdata.dsize = sizeof(crec->header) + data.dsize;
662
663         if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
664                 return NT_STATUS_NO_MEMORY;
665         }
666
667         memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
668         memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
669
670         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
671
672         SAFE_FREE(cdata.dptr);
673
674         return (ret == 0) ? NT_STATUS_OK
675                           : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
676 }
677
678
679 /* for persistent databases the store is a bit different. We have to
680    ask the ctdb daemon to push the record to all nodes after the
681    store */
682 static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
683 {
684         struct db_ctdb_rec *crec;
685         struct db_record *record;
686         TDB_DATA cdata;
687         int ret;
688         NTSTATUS status;
689         uint32_t count;
690         int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
691
692         for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
693              (count < max_retries) && !NT_STATUS_IS_OK(status);
694              count++)
695         {
696                 if (count > 0) {
697                         /* retry */
698                         /*
699                          * There is a hack here: We use rec as a memory
700                          * context and re-use it as the record struct ptr.
701                          * We don't free the record data allocated
702                          * in each turn. So all gets freed when the caller
703                          * releases the original record. This is because
704                          * we don't get the record passed in by reference
705                          * in the first place and the caller relies on
706                          * having to free the record himself.
707                          */
708                         record = fetch_locked_internal(crec->ctdb_ctx,
709                                                        rec,
710                                                        rec->key,
711                                                        true /* persistent */);
712                         if (record == NULL) {
713                                 DEBUG(5, ("fetch_locked_internal failed.\n"));
714                                 status = NT_STATUS_NO_MEMORY;
715                                 break;
716                         }
717                 }
718
719                 crec = talloc_get_type_abort(record->private_data,
720                                              struct db_ctdb_rec);
721
722                 cdata.dsize = sizeof(crec->header) + data.dsize;
723
724                 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
725                         return NT_STATUS_NO_MEMORY;
726                 }
727
728                 crec->header.rsn++;
729
730                 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
731                 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
732
733                 status = ctdbd_start_persistent_update(
734                                 messaging_ctdbd_connection(),
735                                 crec->ctdb_ctx->db_id,
736                                 rec->key,
737                                 cdata);
738
739                 if (NT_STATUS_IS_OK(status)) {
740                         ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
741                                         cdata, TDB_REPLACE);
742                         status = (ret == 0)
743                                         ? NT_STATUS_OK
744                                         : tdb_error_to_ntstatus(
745                                                 crec->ctdb_ctx->wtdb->tdb);
746                 }
747
748                 /*
749                  * release the lock *now* in order to prevent deadlocks.
750                  *
751                  * There is a tradeoff: Usually, the record is still locked
752                  * after db->store operation. This lock is usually released
753                  * via the talloc destructor with the TALLOC_FREE to
754                  * the record. So we have two choices:
755                  *
756                  * - Either re-lock the record after the call to persistent_store
757                  *   or cancel_persistent update and this way not changing any
758                  *   assumptions callers may have about the state, but possibly
759                  *   introducing new race conditions.
760                  *
761                  * - Or don't lock the record again but just remove the
762                  *   talloc_destructor. This is less racy but assumes that
763                  *   the lock is always released via TALLOC_FREE of the record.
764                  *
765                  * I choose the first variant for now since it seems less racy.
766                  * We can't guarantee that we succeed in getting the lock
767                  * anyways. The only real danger here is that a caller
768                  * performs multiple store operations after a fetch_locked()
769                  * which is currently not the case.
770                  */
771                 tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
772                 talloc_set_destructor(record, NULL);
773
774                 /* now tell ctdbd to update this record on all other nodes */
775                 if (NT_STATUS_IS_OK(status)) {
776                         status = ctdbd_persistent_store(
777                                         messaging_ctdbd_connection(),
778                                         crec->ctdb_ctx->db_id,
779                                         rec->key,
780                                         cdata);
781                 } else {
782                         ctdbd_cancel_persistent_update(
783                                         messaging_ctdbd_connection(),
784                                         crec->ctdb_ctx->db_id,
785                                         rec->key,
786                                         cdata);
787                         break;
788                 }
789
790                 SAFE_FREE(cdata.dptr);
791         } /* retry-loop */
792
793         if (!NT_STATUS_IS_OK(status)) {
794                 DEBUG(5, ("ctdbd_persistent_store failed after "
795                           "%d retries with error %s - giving up.\n",
796                           count, nt_errstr(status)));
797         }
798
799         SAFE_FREE(cdata.dptr);
800
801         return status;
802 }
803
804 static NTSTATUS db_ctdb_delete(struct db_record *rec)
805 {
806         TDB_DATA data;
807
808         /*
809          * We have to store the header with empty data. TODO: Fix the
810          * tdb-level cleanup
811          */
812
813         ZERO_STRUCT(data);
814
815         return db_ctdb_store(rec, data, 0);
816
817 }
818
819 static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
820 {
821         TDB_DATA data;
822
823         /*
824          * We have to store the header with empty data. TODO: Fix the
825          * tdb-level cleanup
826          */
827
828         ZERO_STRUCT(data);
829
830         return db_ctdb_store_persistent(rec, data, 0);
831
832 }
833
834 static int db_ctdb_record_destr(struct db_record* data)
835 {
836         struct db_ctdb_rec *crec = talloc_get_type_abort(
837                 data->private_data, struct db_ctdb_rec);
838
839         DEBUG(10, (DEBUGLEVEL > 10
840                    ? "Unlocking db %u key %s\n"
841                    : "Unlocking db %u key %.20s\n",
842                    (int)crec->ctdb_ctx->db_id,
843                    hex_encode(data, (unsigned char *)data->key.dptr,
844                               data->key.dsize)));
845
846         if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
847                 DEBUG(0, ("tdb_chainunlock failed\n"));
848                 return -1;
849         }
850
851         return 0;
852 }
853
854 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
855                                                TALLOC_CTX *mem_ctx,
856                                                TDB_DATA key,
857                                                bool persistent)
858 {
859         struct db_record *result;
860         struct db_ctdb_rec *crec;
861         NTSTATUS status;
862         TDB_DATA ctdb_data;
863         int migrate_attempts = 0;
864
865         if (ctx->transaction != NULL) {
866                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
867         }
868
869         if (!(result = talloc(mem_ctx, struct db_record))) {
870                 DEBUG(0, ("talloc failed\n"));
871                 return NULL;
872         }
873
874         if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
875                 DEBUG(0, ("talloc failed\n"));
876                 TALLOC_FREE(result);
877                 return NULL;
878         }
879
880         result->private_data = (void *)crec;
881         crec->ctdb_ctx = ctx;
882
883         result->key.dsize = key.dsize;
884         result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
885         if (result->key.dptr == NULL) {
886                 DEBUG(0, ("talloc failed\n"));
887                 TALLOC_FREE(result);
888                 return NULL;
889         }
890
891         /*
892          * Do a blocking lock on the record
893          */
894 again:
895
896         if (DEBUGLEVEL >= 10) {
897                 char *keystr = hex_encode(result, key.dptr, key.dsize);
898                 DEBUG(10, (DEBUGLEVEL > 10
899                            ? "Locking db %u key %s\n"
900                            : "Locking db %u key %.20s\n",
901                            (int)crec->ctdb_ctx->db_id, keystr));
902                 TALLOC_FREE(keystr);
903         }
904         
905         if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
906                 DEBUG(3, ("tdb_chainlock failed\n"));
907                 TALLOC_FREE(result);
908                 return NULL;
909         }
910
911         if (persistent) {
912                 result->store = db_ctdb_store_persistent;
913                 result->delete_rec = db_ctdb_delete_persistent;
914         } else {
915                 result->store = db_ctdb_store;
916                 result->delete_rec = db_ctdb_delete;
917         }
918         talloc_set_destructor(result, db_ctdb_record_destr);
919
920         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
921
922         /*
923          * See if we have a valid record and we are the dmaster. If so, we can
924          * take the shortcut and just return it.
925          */
926
927         if ((ctdb_data.dptr == NULL) ||
928             (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
929             ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
930 #if 0
931             || (random() % 2 != 0)
932 #endif
933 ) {
934                 SAFE_FREE(ctdb_data.dptr);
935                 tdb_chainunlock(ctx->wtdb->tdb, key);
936                 talloc_set_destructor(result, NULL);
937
938                 migrate_attempts += 1;
939
940                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
941                            ctdb_data.dptr, ctdb_data.dptr ?
942                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
943                            get_my_vnn()));
944
945                 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
946                 if (!NT_STATUS_IS_OK(status)) {
947                         DEBUG(5, ("ctdb_migrate failed: %s\n",
948                                   nt_errstr(status)));
949                         TALLOC_FREE(result);
950                         return NULL;
951                 }
952                 /* now its migrated, try again */
953                 goto again;
954         }
955
956         if (migrate_attempts > 10) {
957                 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
958                           migrate_attempts));
959         }
960
961         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
962
963         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
964         result->value.dptr = NULL;
965
966         if ((result->value.dsize != 0)
967             && !(result->value.dptr = (uint8 *)talloc_memdup(
968                          result, ctdb_data.dptr + sizeof(crec->header),
969                          result->value.dsize))) {
970                 DEBUG(0, ("talloc failed\n"));
971                 TALLOC_FREE(result);
972         }
973
974         SAFE_FREE(ctdb_data.dptr);
975
976         return result;
977 }
978
979 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
980                                               TALLOC_CTX *mem_ctx,
981                                               TDB_DATA key)
982 {
983         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
984                                                         struct db_ctdb_ctx);
985
986         return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
987 }
988
989 /*
990   fetch (unlocked, no migration) operation on ctdb
991  */
992 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
993                          TDB_DATA key, TDB_DATA *data)
994 {
995         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
996                                                         struct db_ctdb_ctx);
997         NTSTATUS status;
998         TDB_DATA ctdb_data;
999
1000         if (ctx->transaction) {
1001                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1002         }
1003
1004         /* try a direct fetch */
1005         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1006
1007         /*
1008          * See if we have a valid record and we are the dmaster. If so, we can
1009          * take the shortcut and just return it.
1010          * we bypass the dmaster check for persistent databases
1011          */
1012         if ((ctdb_data.dptr != NULL) &&
1013             (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1014             (db->persistent ||
1015              ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1016                 /* we are the dmaster - avoid the ctdb protocol op */
1017
1018                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1019                 if (data->dsize == 0) {
1020                         SAFE_FREE(ctdb_data.dptr);
1021                         data->dptr = NULL;
1022                         return 0;
1023                 }
1024
1025                 data->dptr = (uint8 *)talloc_memdup(
1026                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1027                         data->dsize);
1028
1029                 SAFE_FREE(ctdb_data.dptr);
1030
1031                 if (data->dptr == NULL) {
1032                         return -1;
1033                 }
1034                 return 0;
1035         }
1036
1037         SAFE_FREE(ctdb_data.dptr);
1038
1039         /* we weren't able to get it locally - ask ctdb to fetch it for us */
1040         status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1041         if (!NT_STATUS_IS_OK(status)) {
1042                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1043                 return -1;
1044         }
1045
1046         return 0;
1047 }
1048
1049 struct traverse_state {
1050         struct db_context *db;
1051         int (*fn)(struct db_record *rec, void *private_data);
1052         void *private_data;
1053 };
1054
1055 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1056 {
1057         struct traverse_state *state = (struct traverse_state *)private_data;
1058         struct db_record *rec;
1059         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1060         /* we have to give them a locked record to prevent races */
1061         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1062         if (rec && rec->value.dsize > 0) {
1063                 state->fn(rec, state->private_data);
1064         }
1065         talloc_free(tmp_ctx);
1066 }
1067
1068 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1069                                         void *private_data)
1070 {
1071         struct traverse_state *state = (struct traverse_state *)private_data;
1072         struct db_record *rec;
1073         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1074         int ret = 0;
1075         /* we have to give them a locked record to prevent races */
1076         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1077         if (rec && rec->value.dsize > 0) {
1078                 ret = state->fn(rec, state->private_data);
1079         }
1080         talloc_free(tmp_ctx);
1081         return ret;
1082 }
1083
1084 static int db_ctdb_traverse(struct db_context *db,
1085                             int (*fn)(struct db_record *rec,
1086                                       void *private_data),
1087                             void *private_data)
1088 {
1089         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1090                                                         struct db_ctdb_ctx);
1091         struct traverse_state state;
1092
1093         state.db = db;
1094         state.fn = fn;
1095         state.private_data = private_data;
1096
1097         if (db->persistent) {
1098                 /* for persistent databases we don't need to do a ctdb traverse,
1099                    we can do a faster local traverse */
1100                 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1101         }
1102
1103
1104         ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1105         return 0;
1106 }
1107
1108 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1109 {
1110         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1111 }
1112
1113 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1114 {
1115         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1116 }
1117
1118 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1119 {
1120         struct traverse_state *state = (struct traverse_state *)private_data;
1121         struct db_record rec;
1122         rec.key = key;
1123         rec.value = data;
1124         rec.store = db_ctdb_store_deny;
1125         rec.delete_rec = db_ctdb_delete_deny;
1126         rec.private_data = state->db;
1127         state->fn(&rec, state->private_data);
1128 }
1129
1130 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1131                                         void *private_data)
1132 {
1133         struct traverse_state *state = (struct traverse_state *)private_data;
1134         struct db_record rec;
1135         rec.key = kbuf;
1136         rec.value = dbuf;
1137         rec.store = db_ctdb_store_deny;
1138         rec.delete_rec = db_ctdb_delete_deny;
1139         rec.private_data = state->db;
1140
1141         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1142                 /* a deleted record */
1143                 return 0;
1144         }
1145         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1146         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1147
1148         return state->fn(&rec, state->private_data);
1149 }
1150
1151 static int db_ctdb_traverse_read(struct db_context *db,
1152                                  int (*fn)(struct db_record *rec,
1153                                            void *private_data),
1154                                  void *private_data)
1155 {
1156         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1157                                                         struct db_ctdb_ctx);
1158         struct traverse_state state;
1159
1160         state.db = db;
1161         state.fn = fn;
1162         state.private_data = private_data;
1163
1164         if (db->persistent) {
1165                 /* for persistent databases we don't need to do a ctdb traverse,
1166                    we can do a faster local traverse */
1167                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1168         }
1169
1170         ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1171         return 0;
1172 }
1173
1174 static int db_ctdb_get_seqnum(struct db_context *db)
1175 {
1176         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1177                                                         struct db_ctdb_ctx);
1178         return tdb_get_seqnum(ctx->wtdb->tdb);
1179 }
1180
1181 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1182                                 const char *name,
1183                                 int hash_size, int tdb_flags,
1184                                 int open_flags, mode_t mode)
1185 {
1186         struct db_context *result;
1187         struct db_ctdb_ctx *db_ctdb;
1188         char *db_path;
1189
1190         if (!lp_clustering()) {
1191                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1192                 return NULL;
1193         }
1194
1195         if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1196                 DEBUG(0, ("talloc failed\n"));
1197                 TALLOC_FREE(result);
1198                 return NULL;
1199         }
1200
1201         if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1202                 DEBUG(0, ("talloc failed\n"));
1203                 TALLOC_FREE(result);
1204                 return NULL;
1205         }
1206
1207         db_ctdb->transaction = NULL;
1208
1209         if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1210                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1211                 TALLOC_FREE(result);
1212                 return NULL;
1213         }
1214
1215         db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1216
1217         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1218
1219         /* only pass through specific flags */
1220         tdb_flags &= TDB_SEQNUM;
1221
1222         /* honor permissions if user has specified O_CREAT */
1223         if (open_flags & O_CREAT) {
1224                 chmod(db_path, mode);
1225         }
1226
1227         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1228         if (db_ctdb->wtdb == NULL) {
1229                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1230                 TALLOC_FREE(result);
1231                 return NULL;
1232         }
1233         talloc_free(db_path);
1234
1235         result->private_data = (void *)db_ctdb;
1236         result->fetch_locked = db_ctdb_fetch_locked;
1237         result->fetch = db_ctdb_fetch;
1238         result->traverse = db_ctdb_traverse;
1239         result->traverse_read = db_ctdb_traverse_read;
1240         result->get_seqnum = db_ctdb_get_seqnum;
1241         result->transaction_start = db_ctdb_transaction_start;
1242         result->transaction_commit = db_ctdb_transaction_commit;
1243         result->transaction_cancel = db_ctdb_transaction_cancel;
1244
1245         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1246                  name, db_ctdb->db_id));
1247
1248         return result;
1249 }
1250 #endif