s3: Factor out db_ctdb_marshall_loop_next_key from db_ctdb_marshall_loop_next
[obnox/samba/samba-obnox.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
1 /*
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007-2009
5    Copyright (C) Michael Adam 2009
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
24 #include "util_tdb.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
29
30 #ifdef CLUSTER_SUPPORT
31
32 /*
33  * It is not possible to include ctdb.h and tdb_compat.h (included via
34  * some other include above) without warnings. This fixes those
35  * warnings.
36  */
37
38 #ifdef typesafe_cb
39 #undef typesafe_cb
40 #endif
41
42 #ifdef typesafe_cb_preargs
43 #undef typesafe_cb_preargs
44 #endif
45
46 #ifdef typesafe_cb_postargs
47 #undef typesafe_cb_postargs
48 #endif
49
50 #include "ctdb.h"
51 #include "ctdb_private.h"
52 #include "ctdbd_conn.h"
53 #include "dbwrap/dbwrap.h"
54 #include "dbwrap/dbwrap_private.h"
55 #include "dbwrap/dbwrap_ctdb.h"
56 #include "g_lock.h"
57 #include "messages.h"
58
59 struct db_ctdb_transaction_handle {
60         struct db_ctdb_ctx *ctx;
61         /*
62          * we store the writes done under a transaction:
63          */
64         struct ctdb_marshall_buffer *m_write;
65         uint32_t nesting;
66         bool nested_cancel;
67         char *lock_name;
68 };
69
70 struct db_ctdb_ctx {
71         struct db_context *db;
72         struct tdb_wrap *wtdb;
73         uint32_t db_id;
74         struct db_ctdb_transaction_handle *transaction;
75         struct g_lock_ctx *lock_ctx;
76 };
77
78 struct db_ctdb_rec {
79         struct db_ctdb_ctx *ctdb_ctx;
80         struct ctdb_ltdb_header header;
81         struct timeval lock_time;
82 };
83
84 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
85 {
86         enum TDB_ERROR tret = tdb_error(tdb);
87
88         return map_nt_error_from_tdb(tret);
89 }
90
91 struct db_ctdb_ltdb_parse_state {
92         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
93                        TDB_DATA data, void *private_data);
94         void *private_data;
95 };
96
97 static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
98                                void *private_data)
99 {
100         struct db_ctdb_ltdb_parse_state *state =
101                 (struct db_ctdb_ltdb_parse_state *)private_data;
102
103         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
104                 return -1;
105         }
106         state->parser(
107                 key, (struct ctdb_ltdb_header *)data.dptr,
108                 make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
109                               data.dsize - sizeof(struct ctdb_ltdb_header)),
110                 state->private_data);
111         return 0;
112 }
113
114 static NTSTATUS db_ctdb_ltdb_parse(
115         struct db_ctdb_ctx *db, TDB_DATA key,
116         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
117                        TDB_DATA data, void *private_data),
118         void *private_data)
119 {
120         struct db_ctdb_ltdb_parse_state state;
121         int ret;
122
123         state.parser = parser;
124         state.private_data = private_data;
125
126         ret = tdb_parse_record(db->wtdb->tdb, key, db_ctdb_ltdb_parser,
127                                &state);
128         if (ret == -1) {
129                 return NT_STATUS_NOT_FOUND;
130         }
131         return NT_STATUS_OK;
132 }
133
134 struct db_ctdb_ltdb_fetch_state {
135         struct ctdb_ltdb_header *header;
136         TALLOC_CTX *mem_ctx;
137         TDB_DATA *data;
138         bool oom;
139 };
140
141 static void db_ctdb_ltdb_fetch_parser(
142         TDB_DATA key, struct ctdb_ltdb_header *header,
143         TDB_DATA data, void *private_data)
144 {
145         struct db_ctdb_ltdb_fetch_state *state =
146                 (struct db_ctdb_ltdb_fetch_state *)private_data;
147
148         if (state->header != NULL) {
149                 memcpy(state->header, header, sizeof(struct ctdb_ltdb_header));
150         }
151         if (state->data == NULL) {
152                 return;
153         }
154         state->data->dsize = data.dsize;
155         if (data.dsize == 0) {
156                 state->data->dptr = NULL;
157                 return;
158         }
159         state->data->dptr = talloc_memdup(state->mem_ctx, data.dptr,
160                                           data.dsize);
161         if (state->data->dptr == NULL) {
162                 state->oom = true;
163                 return;
164         }
165 }
166
167 /**
168  * fetch a record from the tdb, separating out the header
169  * information and returning the body of the record.
170  */
171 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
172                                    TDB_DATA key,
173                                    struct ctdb_ltdb_header *header,
174                                    TALLOC_CTX *mem_ctx,
175                                    TDB_DATA *data)
176 {
177         struct db_ctdb_ltdb_fetch_state state;
178         NTSTATUS status;
179
180         state.header = header;
181         state.mem_ctx = mem_ctx;
182         state.data = data;
183         state.oom = false;
184
185         status = db_ctdb_ltdb_parse(db, key, db_ctdb_ltdb_fetch_parser,
186                                     &state);
187         if (!NT_STATUS_IS_OK(status)) {
188                 if (data) {
189                         ZERO_STRUCTP(data);
190                 }
191                 if (header) {
192                         header->dmaster = (uint32_t)-1;
193                         header->rsn = 0;
194                 }
195                 return status;
196         }
197         if (state.oom) {
198                 return NT_STATUS_NO_MEMORY;
199         }
200         return NT_STATUS_OK;
201 }
202
203 /*
204  * Store a record together with the ctdb record header
205  * in the local copy of the database.
206  */
207 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
208                                    TDB_DATA key,
209                                    struct ctdb_ltdb_header *header,
210                                    TDB_DATA data)
211 {
212         TALLOC_CTX *tmp_ctx = talloc_stackframe();
213         TDB_DATA rec;
214         int ret;
215
216         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
217         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
218
219         if (rec.dptr == NULL) {
220                 talloc_free(tmp_ctx);
221                 return NT_STATUS_NO_MEMORY;
222         }
223
224         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
225         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
226
227         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
228
229         talloc_free(tmp_ctx);
230
231         return (ret == 0) ? NT_STATUS_OK
232                           : tdb_error_to_ntstatus(db->wtdb->tdb);
233
234 }
235
236 /*
237   form a ctdb_rec_data record from a key/data pair
238  */
239 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
240                                                   TDB_DATA key,
241                                                   struct ctdb_ltdb_header *header,
242                                                   TDB_DATA data)
243 {
244         size_t length;
245         struct ctdb_rec_data *d;
246
247         length = offsetof(struct ctdb_rec_data, data) + key.dsize +
248                 data.dsize + sizeof(*header);
249         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
250         if (d == NULL) {
251                 return NULL;
252         }
253         d->length = length;
254         d->reqid = reqid;
255         d->keylen = key.dsize;
256         memcpy(&d->data[0], key.dptr, key.dsize);
257
258         d->datalen = data.dsize + sizeof(*header);
259         memcpy(&d->data[key.dsize], header, sizeof(*header));
260         memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
261         return d;
262 }
263
264
265 /* helper function for marshalling multiple records */
266 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
267                                                struct ctdb_marshall_buffer *m,
268                                                uint64_t db_id,
269                                                uint32_t reqid,
270                                                TDB_DATA key,
271                                                struct ctdb_ltdb_header *header,
272                                                TDB_DATA data)
273 {
274         struct ctdb_rec_data *r;
275         size_t m_size, r_size;
276         struct ctdb_marshall_buffer *m2 = NULL;
277
278         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
279         if (r == NULL) {
280                 talloc_free(m);
281                 return NULL;
282         }
283
284         if (m == NULL) {
285                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
286                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
287                 if (m == NULL) {
288                         goto done;
289                 }
290                 m->db_id = db_id;
291         }
292
293         m_size = talloc_get_size(m);
294         r_size = talloc_get_size(r);
295
296         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
297                 mem_ctx, m,  m_size + r_size);
298         if (m2 == NULL) {
299                 talloc_free(m);
300                 goto done;
301         }
302
303         memcpy(m_size + (uint8_t *)m2, r, r_size);
304
305         m2->count++;
306
307 done:
308         talloc_free(r);
309         return m2;
310 }
311
312 /* we've finished marshalling, return a data blob with the marshalled records */
313 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
314 {
315         TDB_DATA data;
316         data.dptr = (uint8_t *)m;
317         data.dsize = talloc_get_size(m);
318         return data;
319 }
320
321 /*
322    loop over a marshalling buffer
323
324      - pass r==NULL to start
325      - loop the number of times indicated by m->count
326 */
327 static struct ctdb_rec_data *db_ctdb_marshall_loop_next_key(
328         struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r, TDB_DATA *key)
329 {
330         if (r == NULL) {
331                 r = (struct ctdb_rec_data *)&m->data[0];
332         } else {
333                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
334         }
335
336         key->dptr   = &r->data[0];
337         key->dsize  = r->keylen;
338         return r;
339 }
340
341 /*
342    loop over a marshalling buffer
343
344      - pass r==NULL to start
345      - loop the number of times indicated by m->count
346 */
347 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
348                                                      uint32_t *reqid,
349                                                      struct ctdb_ltdb_header *header,
350                                                      TDB_DATA *key, TDB_DATA *data)
351 {
352         r = db_ctdb_marshall_loop_next_key(m, r, key);
353         if (r == NULL) {
354                 return NULL;
355         }
356
357         if (reqid != NULL) {
358                 *reqid = r->reqid;
359         }
360
361         if (data != NULL) {
362                 data->dptr  = &r->data[r->keylen];
363                 data->dsize = r->datalen;
364                 if (header != NULL) {
365                         data->dptr += sizeof(*header);
366                         data->dsize -= sizeof(*header);
367                 }
368         }
369
370         if (header != NULL) {
371                 if (r->datalen < sizeof(*header)) {
372                         return NULL;
373                 }
374                 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
375         }
376
377         return r;
378 }
379
380 /**
381  * CTDB transaction destructor
382  */
383 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
384 {
385         NTSTATUS status;
386
387         status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
388         if (!NT_STATUS_IS_OK(status)) {
389                 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
390                           nt_errstr(status)));
391                 return -1;
392         }
393         return 0;
394 }
395
396 /**
397  * CTDB dbwrap API: transaction_start function
398  * starts a transaction on a persistent database
399  */
400 static int db_ctdb_transaction_start(struct db_context *db)
401 {
402         struct db_ctdb_transaction_handle *h;
403         NTSTATUS status;
404         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
405                                                         struct db_ctdb_ctx);
406
407         if (!db->persistent) {
408                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
409                          ctx->db_id));
410                 return -1;
411         }
412
413         if (ctx->transaction) {
414                 ctx->transaction->nesting++;
415                 DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
416                           ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
417                 return 0;
418         }
419
420         h = talloc_zero(db, struct db_ctdb_transaction_handle);
421         if (h == NULL) {
422                 DEBUG(0,(__location__ " oom for transaction handle\n"));
423                 return -1;
424         }
425
426         h->ctx = ctx;
427
428         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
429                                        (unsigned int)ctx->db_id);
430         if (h->lock_name == NULL) {
431                 DEBUG(0, ("talloc_asprintf failed\n"));
432                 TALLOC_FREE(h);
433                 return -1;
434         }
435
436         /*
437          * Wait a day, i.e. forever...
438          */
439         status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE,
440                              timeval_set(86400, 0));
441         if (!NT_STATUS_IS_OK(status)) {
442                 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status)));
443                 TALLOC_FREE(h);
444                 return -1;
445         }
446
447         talloc_set_destructor(h, db_ctdb_transaction_destructor);
448
449         ctx->transaction = h;
450
451         DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
452
453         return 0;
454 }
455
456 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
457                                              TDB_DATA key,
458                                              struct ctdb_ltdb_header *pheader,
459                                              TALLOC_CTX *mem_ctx,
460                                              TDB_DATA *pdata)
461 {
462         struct ctdb_rec_data *rec = NULL;
463         struct ctdb_ltdb_header h;
464         bool found = false;
465         TDB_DATA data;
466         int i;
467
468         if (buf == NULL) {
469                 return false;
470         }
471
472         ZERO_STRUCT(h);
473         ZERO_STRUCT(data);
474
475         /*
476          * Walk the list of records written during this
477          * transaction. If we want to read one we have already
478          * written, return the last written sample. Thus we do not do
479          * a "break;" for the first hit, this record might have been
480          * overwritten later.
481          */
482
483         for (i=0; i<buf->count; i++) {
484                 TDB_DATA tkey, tdata;
485                 uint32_t reqid;
486                 struct ctdb_ltdb_header hdr;
487
488                 ZERO_STRUCT(hdr);
489
490                 rec = db_ctdb_marshall_loop_next(buf, rec, &reqid, &hdr, &tkey,
491                                                  &tdata);
492                 if (rec == NULL) {
493                         return false;
494                 }
495
496                 if (tdb_data_equal(key, tkey)) {
497                         found = true;
498                         data = tdata;
499                         h = hdr;
500                 }
501         }
502
503         if (!found) {
504                 return false;
505         }
506
507         if (pdata != NULL) {
508                 data.dptr = (uint8_t *)talloc_memdup(mem_ctx, data.dptr,
509                                                      data.dsize);
510                 if ((data.dsize != 0) && (data.dptr == NULL)) {
511                         return false;
512                 }
513                 *pdata = data;
514         }
515
516         if (pheader != NULL) {
517                 *pheader = h;
518         }
519
520         return true;
521 }
522
523 /*
524   fetch a record inside a transaction
525  */
526 static NTSTATUS db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
527                                           TALLOC_CTX *mem_ctx,
528                                           TDB_DATA key, TDB_DATA *data)
529 {
530         struct db_ctdb_transaction_handle *h = db->transaction;
531         NTSTATUS status;
532         bool found;
533
534         found = pull_newest_from_marshall_buffer(h->m_write, key, NULL,
535                                                  mem_ctx, data);
536         if (found) {
537                 return NT_STATUS_OK;
538         }
539
540         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
541
542         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
543                 *data = tdb_null;
544         }
545
546         return status;
547 }
548
549 /**
550  * Fetch a record from a persistent database
551  * without record locking and without an active transaction.
552  *
553  * This just fetches from the local database copy.
554  * Since the databases are kept in syc cluster-wide,
555  * there is no point in doing a ctdb call to fetch the
556  * record from the lmaster. It does even harm since migration
557  * of records bump their RSN and hence render the persistent
558  * database inconsistent.
559  */
560 static NTSTATUS db_ctdb_fetch_persistent(struct db_ctdb_ctx *db,
561                                          TALLOC_CTX *mem_ctx,
562                                          TDB_DATA key, TDB_DATA *data)
563 {
564         NTSTATUS status;
565
566         status = db_ctdb_ltdb_fetch(db, key, NULL, mem_ctx, data);
567
568         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
569                 *data = tdb_null;
570         }
571
572         return status;
573 }
574
575 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
576 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
577
578 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
579                                                           TALLOC_CTX *mem_ctx,
580                                                           TDB_DATA key)
581 {
582         struct db_record *result;
583         TDB_DATA ctdb_data;
584
585         if (!(result = talloc(mem_ctx, struct db_record))) {
586                 DEBUG(0, ("talloc failed\n"));
587                 return NULL;
588         }
589
590         result->private_data = ctx->transaction;
591
592         result->key.dsize = key.dsize;
593         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
594                                                     key.dsize);
595         if (result->key.dptr == NULL) {
596                 DEBUG(0, ("talloc failed\n"));
597                 TALLOC_FREE(result);
598                 return NULL;
599         }
600
601         result->store = db_ctdb_store_transaction;
602         result->delete_rec = db_ctdb_delete_transaction;
603
604         if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key,
605                                              NULL, result, &result->value)) {
606                 return result;
607         }
608
609         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
610         if (ctdb_data.dptr == NULL) {
611                 /* create the record */
612                 result->value = tdb_null;
613                 return result;
614         }
615
616         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
617         result->value.dptr = NULL;
618
619         if ((result->value.dsize != 0)
620             && !(result->value.dptr = (uint8_t *)talloc_memdup(
621                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
622                          result->value.dsize))) {
623                 DEBUG(0, ("talloc failed\n"));
624                 TALLOC_FREE(result);
625         }
626
627         SAFE_FREE(ctdb_data.dptr);
628
629         return result;
630 }
631
632 static int db_ctdb_record_destructor(struct db_record **recp)
633 {
634         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
635         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
636                 rec->private_data, struct db_ctdb_transaction_handle);
637         int ret = h->ctx->db->transaction_commit(h->ctx->db);
638         if (ret != 0) {
639                 DEBUG(0,(__location__ " transaction_commit failed\n"));
640         }
641         return 0;
642 }
643
644 /*
645   auto-create a transaction for persistent databases
646  */
647 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
648                                                          TALLOC_CTX *mem_ctx,
649                                                          TDB_DATA key)
650 {
651         int res;
652         struct db_record *rec, **recp;
653
654         res = db_ctdb_transaction_start(ctx->db);
655         if (res == -1) {
656                 return NULL;
657         }
658
659         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
660         if (rec == NULL) {
661                 ctx->db->transaction_cancel(ctx->db);
662                 return NULL;
663         }
664
665         /* destroy this transaction when we release the lock */
666         recp = talloc(rec, struct db_record *);
667         if (recp == NULL) {
668                 ctx->db->transaction_cancel(ctx->db);
669                 talloc_free(rec);
670                 return NULL;
671         }
672         *recp = rec;
673         talloc_set_destructor(recp, db_ctdb_record_destructor);
674         return rec;
675 }
676
677
678 /*
679   stores a record inside a transaction
680  */
681 static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
682                                           TDB_DATA key, TDB_DATA data)
683 {
684         TALLOC_CTX *tmp_ctx = talloc_new(h);
685         TDB_DATA rec;
686         struct ctdb_ltdb_header header;
687
688         ZERO_STRUCT(header);
689
690         /* we need the header so we can update the RSN */
691
692         if (!pull_newest_from_marshall_buffer(h->m_write, key, &header,
693                                               NULL, NULL)) {
694
695                 rec = tdb_fetch_compat(h->ctx->wtdb->tdb, key);
696
697                 if (rec.dptr != NULL) {
698                         memcpy(&header, rec.dptr,
699                                sizeof(struct ctdb_ltdb_header));
700                         rec.dsize -= sizeof(struct ctdb_ltdb_header);
701
702                         /*
703                          * a special case, we are writing the same
704                          * data that is there now
705                          */
706                         if (data.dsize == rec.dsize &&
707                             memcmp(data.dptr,
708                                    rec.dptr + sizeof(struct ctdb_ltdb_header),
709                                    data.dsize) == 0) {
710                                 SAFE_FREE(rec.dptr);
711                                 talloc_free(tmp_ctx);
712                                 return NT_STATUS_OK;
713                         }
714                 }
715                 SAFE_FREE(rec.dptr);
716         }
717
718         header.dmaster = get_my_vnn();
719         header.rsn++;
720
721         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
722         if (h->m_write == NULL) {
723                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
724                 talloc_free(tmp_ctx);
725                 return NT_STATUS_NO_MEMORY;
726         }
727
728         talloc_free(tmp_ctx);
729         return NT_STATUS_OK;
730 }
731
732
733 /* 
734    a record store inside a transaction
735  */
736 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
737 {
738         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
739                 rec->private_data, struct db_ctdb_transaction_handle);
740         NTSTATUS status;
741
742         status = db_ctdb_transaction_store(h, rec->key, data);
743         return status;
744 }
745
746 /*
747    a record delete inside a transaction
748  */
749 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
750 {
751         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
752                 rec->private_data, struct db_ctdb_transaction_handle);
753         NTSTATUS status;
754
755         status =  db_ctdb_transaction_store(h, rec->key, tdb_null);
756         return status;
757 }
758
759 static void db_ctdb_fetch_db_seqnum_parser(
760         TDB_DATA key, struct ctdb_ltdb_header *header,
761         TDB_DATA data, void *private_data)
762 {
763         uint64_t *seqnum = (uint64_t *)private_data;
764
765         if (data.dsize != sizeof(uint64_t)) {
766                 *seqnum = 0;
767                 return;
768         }
769         memcpy(seqnum, data.dptr, sizeof(*seqnum));
770 }
771
772 /**
773  * Fetch the db sequence number of a persistent db directly from the db.
774  */
775 static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
776                                                 uint64_t *seqnum)
777 {
778         NTSTATUS status;
779         TDB_DATA key;
780
781         if (seqnum == NULL) {
782                 return NT_STATUS_INVALID_PARAMETER;
783         }
784
785         key = string_term_tdb_data(CTDB_DB_SEQNUM_KEY);
786
787         status = db_ctdb_ltdb_parse(
788                 db, key, db_ctdb_fetch_db_seqnum_parser, seqnum);
789
790         if (NT_STATUS_IS_OK(status)) {
791                 return NT_STATUS_OK;
792         }
793         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
794                 *seqnum = 0;
795                 return NT_STATUS_OK;
796         }
797         return status;
798 }
799
800 /**
801  * Store the database sequence number inside a transaction.
802  */
803 static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h,
804                                         uint64_t seqnum)
805 {
806         NTSTATUS status;
807         const char *keyname = CTDB_DB_SEQNUM_KEY;
808         TDB_DATA key;
809         TDB_DATA data;
810
811         key = string_term_tdb_data(keyname);
812
813         data.dptr = (uint8_t *)&seqnum;
814         data.dsize = sizeof(uint64_t);
815
816         status = db_ctdb_transaction_store(h, key, data);
817
818         return status;
819 }
820
821 /*
822   commit a transaction
823  */
824 static int db_ctdb_transaction_commit(struct db_context *db)
825 {
826         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
827                                                         struct db_ctdb_ctx);
828         NTSTATUS rets;
829         int status;
830         struct db_ctdb_transaction_handle *h = ctx->transaction;
831         uint64_t old_seqnum, new_seqnum;
832         int ret;
833
834         if (h == NULL) {
835                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
836                 return -1;
837         }
838
839         if (h->nested_cancel) {
840                 db->transaction_cancel(db);
841                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
842                 return -1;
843         }
844
845         if (h->nesting != 0) {
846                 h->nesting--;
847                 DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
848                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
849                 return 0;
850         }
851
852         if (h->m_write == NULL) {
853                 /*
854                  * No changes were made, so don't change the seqnum,
855                  * don't push to other node, just exit with success.
856                  */
857                 ret = 0;
858                 goto done;
859         }
860
861         DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
862
863         /*
864          * As the last db action before committing, bump the database sequence
865          * number. Note that this undoes all changes to the seqnum records
866          * performed under the transaction. This record is not meant to be
867          * modified by user interaction. It is for internal use only...
868          */
869         rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum);
870         if (!NT_STATUS_IS_OK(rets)) {
871                 DEBUG(1, (__location__ " failed to fetch the db sequence number "
872                           "in transaction commit on db 0x%08x\n", ctx->db_id));
873                 ret = -1;
874                 goto done;
875         }
876
877         new_seqnum = old_seqnum + 1;
878
879         rets = db_ctdb_store_db_seqnum(h, new_seqnum);
880         if (!NT_STATUS_IS_OK(rets)) {
881                 DEBUG(1, (__location__ "failed to store the db sequence number "
882                           " in transaction commit on db 0x%08x\n", ctx->db_id));
883                 ret = -1;
884                 goto done;
885         }
886
887 again:
888         /* tell ctdbd to commit to the other nodes */
889         rets = ctdbd_control_local(messaging_ctdbd_connection(),
890                                    CTDB_CONTROL_TRANS3_COMMIT,
891                                    h->ctx->db_id, 0,
892                                    db_ctdb_marshall_finish(h->m_write),
893                                    NULL, NULL, &status);
894         if (!NT_STATUS_IS_OK(rets) || status != 0) {
895                 /*
896                  * The TRANS3_COMMIT control should only possibly fail when a
897                  * recovery has been running concurrently. In any case, the db
898                  * will be the same on all nodes, either the new copy or the
899                  * old copy.  This can be detected by comparing the old and new
900                  * local sequence numbers.
901                  */
902                 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum);
903                 if (!NT_STATUS_IS_OK(rets)) {
904                         DEBUG(1, (__location__ " failed to refetch db sequence "
905                                   "number after failed TRANS3_COMMIT\n"));
906                         ret = -1;
907                         goto done;
908                 }
909
910                 if (new_seqnum == old_seqnum) {
911                         /* Recovery prevented all our changes: retry. */
912                         goto again;
913                 }
914                 if (new_seqnum != (old_seqnum + 1)) {
915                         DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
916                                   "old_seqnum[%lu] + (0 or 1) after failed "
917                                   "TRANS3_COMMIT - this should not happen!\n",
918                                   (unsigned long)new_seqnum,
919                                   (unsigned long)old_seqnum));
920                         ret = -1;
921                         goto done;
922                 }
923                 /*
924                  * Recovery propagated our changes to all nodes, completing
925                  * our commit for us - succeed.
926                  */
927         }
928
929         ret = 0;
930
931 done:
932         h->ctx->transaction = NULL;
933         talloc_free(h);
934         return ret;
935 }
936
937
938 /*
939   cancel a transaction
940  */
941 static int db_ctdb_transaction_cancel(struct db_context *db)
942 {
943         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
944                                                         struct db_ctdb_ctx);
945         struct db_ctdb_transaction_handle *h = ctx->transaction;
946
947         if (h == NULL) {
948                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
949                 return -1;
950         }
951
952         if (h->nesting != 0) {
953                 h->nesting--;
954                 h->nested_cancel = true;
955                 DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
956                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
957                 return 0;
958         }
959
960         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
961
962         ctx->transaction = NULL;
963         talloc_free(h);
964         return 0;
965 }
966
967
968 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
969 {
970         struct db_ctdb_rec *crec = talloc_get_type_abort(
971                 rec->private_data, struct db_ctdb_rec);
972
973         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
974 }
975
976
977
978 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
979 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
980 {
981         NTSTATUS status;
982         struct ctdb_control_schedule_for_deletion *dd;
983         TDB_DATA indata;
984         int cstatus;
985         struct db_ctdb_rec *crec = talloc_get_type_abort(
986                 rec->private_data, struct db_ctdb_rec);
987
988         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + rec->key.dsize;
989         indata.dptr = talloc_zero_array(crec, uint8_t, indata.dsize);
990         if (indata.dptr == NULL) {
991                 DEBUG(0, (__location__ " talloc failed!\n"));
992                 return NT_STATUS_NO_MEMORY;
993         }
994
995         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
996         dd->db_id = crec->ctdb_ctx->db_id;
997         dd->hdr = crec->header;
998         dd->keylen = rec->key.dsize;
999         memcpy(dd->key, rec->key.dptr, rec->key.dsize);
1000
1001         status = ctdbd_control_local(messaging_ctdbd_connection(),
1002                                      CTDB_CONTROL_SCHEDULE_FOR_DELETION,
1003                                      crec->ctdb_ctx->db_id,
1004                                      CTDB_CTRL_FLAG_NOREPLY, /* flags */
1005                                      indata,
1006                                      NULL, /* outdata */
1007                                      NULL, /* errmsg */
1008                                      &cstatus);
1009         talloc_free(indata.dptr);
1010
1011         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
1012                 DEBUG(1, (__location__ " Error sending local control "
1013                           "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
1014                           nt_errstr(status), cstatus));
1015                 if (NT_STATUS_IS_OK(status)) {
1016                         status = NT_STATUS_UNSUCCESSFUL;
1017                 }
1018         }
1019
1020         return status;
1021 }
1022 #endif
1023
1024 static NTSTATUS db_ctdb_delete(struct db_record *rec)
1025 {
1026         TDB_DATA data;
1027         NTSTATUS status;
1028
1029         /*
1030          * We have to store the header with empty data. TODO: Fix the
1031          * tdb-level cleanup
1032          */
1033
1034         ZERO_STRUCT(data);
1035
1036         status = db_ctdb_store(rec, data, 0);
1037         if (!NT_STATUS_IS_OK(status)) {
1038                 return status;
1039         }
1040
1041 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
1042         status = db_ctdb_send_schedule_for_deletion(rec);
1043 #endif
1044
1045         return status;
1046 }
1047
1048 static int db_ctdb_record_destr(struct db_record* data)
1049 {
1050         struct db_ctdb_rec *crec = talloc_get_type_abort(
1051                 data->private_data, struct db_ctdb_rec);
1052         int threshold;
1053
1054         DEBUG(10, (DEBUGLEVEL > 10
1055                    ? "Unlocking db %u key %s\n"
1056                    : "Unlocking db %u key %.20s\n",
1057                    (int)crec->ctdb_ctx->db_id,
1058                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
1059                               data->key.dsize)));
1060
1061         tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
1062
1063         threshold = lp_ctdb_locktime_warn_threshold();
1064         if (threshold != 0) {
1065                 double timediff = timeval_elapsed(&crec->lock_time);
1066                 if ((timediff * 1000) > threshold) {
1067                         const char *key;
1068
1069                         key = hex_encode_talloc(data,
1070                                                 (unsigned char *)data->key.dptr,
1071                                                 data->key.dsize);
1072                         DEBUG(0, ("Held tdb lock on db %s, key %s %f seconds\n",
1073                                   tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
1074                                   timediff));
1075                 }
1076         }
1077
1078         return 0;
1079 }
1080
1081 /**
1082  * Check whether we have a valid local copy of the given record,
1083  * either for reading or for writing.
1084  */
1085 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, bool read_only)
1086 {
1087         struct ctdb_ltdb_header *hdr;
1088
1089         if (ctdb_data.dptr == NULL)
1090                 return false;
1091
1092         if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header))
1093                 return false;
1094
1095         hdr = (struct ctdb_ltdb_header *)ctdb_data.dptr;
1096
1097 #ifdef HAVE_CTDB_WANT_READONLY_DECL
1098         if (hdr->dmaster != get_my_vnn()) {
1099                 /* If we're not dmaster, it must be r/o copy. */
1100                 return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
1101         }
1102
1103         /*
1104          * If we want write access, no one may have r/o copies.
1105          */
1106         return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
1107 #else
1108         return (hdr->dmaster == get_my_vnn());
1109 #endif
1110 }
1111
1112 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
1113                                                TALLOC_CTX *mem_ctx,
1114                                                TDB_DATA key,
1115                                                bool tryonly)
1116 {
1117         struct db_record *result;
1118         struct db_ctdb_rec *crec;
1119         NTSTATUS status;
1120         TDB_DATA ctdb_data;
1121         int migrate_attempts = 0;
1122         int lockret;
1123
1124         if (!(result = talloc(mem_ctx, struct db_record))) {
1125                 DEBUG(0, ("talloc failed\n"));
1126                 return NULL;
1127         }
1128
1129         if (!(crec = talloc_zero(result, struct db_ctdb_rec))) {
1130                 DEBUG(0, ("talloc failed\n"));
1131                 TALLOC_FREE(result);
1132                 return NULL;
1133         }
1134
1135         result->db = ctx->db;
1136         result->private_data = (void *)crec;
1137         crec->ctdb_ctx = ctx;
1138
1139         result->key.dsize = key.dsize;
1140         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
1141                                                     key.dsize);
1142         if (result->key.dptr == NULL) {
1143                 DEBUG(0, ("talloc failed\n"));
1144                 TALLOC_FREE(result);
1145                 return NULL;
1146         }
1147
1148         /*
1149          * Do a blocking lock on the record
1150          */
1151 again:
1152
1153         if (DEBUGLEVEL >= 10) {
1154                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1155                 DEBUG(10, (DEBUGLEVEL > 10
1156                            ? "Locking db %u key %s\n"
1157                            : "Locking db %u key %.20s\n",
1158                            (int)crec->ctdb_ctx->db_id, keystr));
1159                 TALLOC_FREE(keystr);
1160         }
1161
1162         lockret = tryonly
1163                 ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
1164                 : tdb_chainlock(ctx->wtdb->tdb, key);
1165         if (lockret != 0) {
1166                 DEBUG(3, ("tdb_chainlock failed\n"));
1167                 TALLOC_FREE(result);
1168                 return NULL;
1169         }
1170
1171         result->store = db_ctdb_store;
1172         result->delete_rec = db_ctdb_delete;
1173         talloc_set_destructor(result, db_ctdb_record_destr);
1174
1175         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
1176
1177         /*
1178          * See if we have a valid record and we are the dmaster. If so, we can
1179          * take the shortcut and just return it.
1180          */
1181
1182         if (!db_ctdb_can_use_local_copy(ctdb_data, false)) {
1183                 SAFE_FREE(ctdb_data.dptr);
1184                 tdb_chainunlock(ctx->wtdb->tdb, key);
1185                 talloc_set_destructor(result, NULL);
1186
1187                 if (tryonly && (migrate_attempts != 0)) {
1188                         DEBUG(5, ("record migrated away again\n"));
1189                         TALLOC_FREE(result);
1190                         return NULL;
1191                 }
1192
1193                 migrate_attempts += 1;
1194
1195                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u) %u\n",
1196                            ctdb_data.dptr, ctdb_data.dptr ?
1197                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1198                            get_my_vnn(),
1199                            ctdb_data.dptr ?
1200                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
1201
1202                 status = ctdbd_migrate(messaging_ctdbd_connection(), ctx->db_id,
1203                                        key);
1204                 if (!NT_STATUS_IS_OK(status)) {
1205                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1206                                   nt_errstr(status)));
1207                         TALLOC_FREE(result);
1208                         return NULL;
1209                 }
1210                 /* now its migrated, try again */
1211                 goto again;
1212         }
1213
1214         if (migrate_attempts > 10) {
1215                 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s needed %d "
1216                           "attempts\n", tdb_name(ctx->wtdb->tdb),
1217                           hex_encode_talloc(talloc_tos(),
1218                                             (unsigned char *)key.dptr,
1219                                             key.dsize),
1220                           migrate_attempts));
1221         }
1222
1223         GetTimeOfDay(&crec->lock_time);
1224
1225         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1226
1227         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1228         result->value.dptr = NULL;
1229
1230         if ((result->value.dsize != 0)
1231             && !(result->value.dptr = (uint8_t *)talloc_memdup(
1232                          result, ctdb_data.dptr + sizeof(crec->header),
1233                          result->value.dsize))) {
1234                 DEBUG(0, ("talloc failed\n"));
1235                 TALLOC_FREE(result);
1236         }
1237
1238         SAFE_FREE(ctdb_data.dptr);
1239
1240         return result;
1241 }
1242
1243 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1244                                               TALLOC_CTX *mem_ctx,
1245                                               TDB_DATA key)
1246 {
1247         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1248                                                         struct db_ctdb_ctx);
1249
1250         if (ctx->transaction != NULL) {
1251                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1252         }
1253
1254         if (db->persistent) {
1255                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1256         }
1257
1258         return fetch_locked_internal(ctx, mem_ctx, key, false);
1259 }
1260
1261 static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
1262                                                   TALLOC_CTX *mem_ctx,
1263                                                   TDB_DATA key)
1264 {
1265         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1266                                                         struct db_ctdb_ctx);
1267
1268         if (ctx->transaction != NULL) {
1269                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1270         }
1271
1272         if (db->persistent) {
1273                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1274         }
1275
1276         return fetch_locked_internal(ctx, mem_ctx, key, true);
1277 }
1278
1279 /*
1280   fetch (unlocked, no migration) operation on ctdb
1281  */
1282 static NTSTATUS db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1283                               TDB_DATA key, TDB_DATA *data)
1284 {
1285         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1286                                                         struct db_ctdb_ctx);
1287         NTSTATUS status;
1288         TDB_DATA ctdb_data;
1289
1290         if (ctx->transaction) {
1291                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1292         }
1293
1294         if (db->persistent) {
1295                 return db_ctdb_fetch_persistent(ctx, mem_ctx, key, data);
1296         }
1297
1298         /* try a direct fetch */
1299         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
1300
1301         /*
1302          * See if we have a valid record and we are the dmaster. If so, we can
1303          * take the shortcut and just return it.
1304          * we bypass the dmaster check for persistent databases
1305          */
1306         if (db_ctdb_can_use_local_copy(ctdb_data, true)) {
1307                 /*
1308                  * We have a valid local copy - avoid the ctdb protocol op
1309                  */
1310                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1311
1312                 data->dptr = (uint8_t *)talloc_memdup(
1313                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1314                         data->dsize);
1315
1316                 SAFE_FREE(ctdb_data.dptr);
1317
1318                 if (data->dptr == NULL) {
1319                         return NT_STATUS_NO_MEMORY;
1320                 }
1321                 return NT_STATUS_OK;
1322         }
1323
1324         SAFE_FREE(ctdb_data.dptr);
1325
1326         /*
1327          * We weren't able to get it locally - ask ctdb to fetch it for us.
1328          * If we already had *something*, it's probably worth making a local
1329          * read-only copy.
1330          */
1331         status = ctdbd_fetch(messaging_ctdbd_connection(), ctx->db_id, key,
1332                              mem_ctx, data,
1333                              ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header));
1334         if (!NT_STATUS_IS_OK(status)) {
1335                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1336         }
1337
1338         return status;
1339 }
1340
1341 static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
1342                                      void (*parser)(TDB_DATA key,
1343                                                     TDB_DATA data,
1344                                                     void *private_data),
1345                                      void *private_data)
1346 {
1347         NTSTATUS status;
1348         TDB_DATA data;
1349
1350         status = db_ctdb_fetch(db, talloc_tos(), key, &data);
1351         if (!NT_STATUS_IS_OK(status)) {
1352                 return status;
1353         }
1354         parser(key, data, private_data);
1355         TALLOC_FREE(data.dptr);
1356         return NT_STATUS_OK;
1357 }
1358
1359 struct traverse_state {
1360         struct db_context *db;
1361         int (*fn)(struct db_record *rec, void *private_data);
1362         void *private_data;
1363         int count;
1364 };
1365
1366 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1367 {
1368         struct traverse_state *state = (struct traverse_state *)private_data;
1369         struct db_record *rec;
1370         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1371         /* we have to give them a locked record to prevent races */
1372         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1373         if (rec && rec->value.dsize > 0) {
1374                 state->fn(rec, state->private_data);
1375         }
1376         talloc_free(tmp_ctx);
1377 }
1378
1379 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1380                                         void *private_data)
1381 {
1382         struct traverse_state *state = (struct traverse_state *)private_data;
1383         struct db_record *rec;
1384         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1385         int ret = 0;
1386
1387         /*
1388          * Skip the __db_sequence_number__ key:
1389          * This is used for persistent transactions internally.
1390          */
1391         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1392             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1393         {
1394                 goto done;
1395         }
1396
1397         /* we have to give them a locked record to prevent races */
1398         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1399         if (rec && rec->value.dsize > 0) {
1400                 ret = state->fn(rec, state->private_data);
1401         }
1402
1403 done:
1404         talloc_free(tmp_ctx);
1405         return ret;
1406 }
1407
1408 /* wrapper to use traverse_persistent_callback with dbwrap */
1409 static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
1410 {
1411         return traverse_persistent_callback(NULL, rec->key, rec->value, data);
1412 }
1413
1414
1415 static int db_ctdb_traverse(struct db_context *db,
1416                             int (*fn)(struct db_record *rec,
1417                                       void *private_data),
1418                             void *private_data)
1419 {
1420         NTSTATUS status;
1421         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1422                                                         struct db_ctdb_ctx);
1423         struct traverse_state state;
1424
1425         state.db = db;
1426         state.fn = fn;
1427         state.private_data = private_data;
1428         state.count = 0;
1429
1430         if (db->persistent) {
1431                 struct tdb_context *ltdb = ctx->wtdb->tdb;
1432                 int ret;
1433
1434                 /* for persistent databases we don't need to do a ctdb traverse,
1435                    we can do a faster local traverse */
1436                 ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
1437                 if (ret < 0) {
1438                         return ret;
1439                 }
1440                 if (ctx->transaction && ctx->transaction->m_write) {
1441                         /*
1442                          * we now have to handle keys not yet
1443                          * present at transaction start
1444                          */
1445                         struct db_context *newkeys = db_open_rbt(talloc_tos());
1446                         struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
1447                         struct ctdb_rec_data *rec=NULL;
1448                         int i;
1449                         int count = 0;
1450
1451                         if (newkeys == NULL) {
1452                                 return -1;
1453                         }
1454
1455                         for (i=0; i<mbuf->count; i++) {
1456                                 TDB_DATA key;
1457                                 rec = db_ctdb_marshall_loop_next_key(
1458                                         mbuf, rec, &key);
1459                                 SMB_ASSERT(rec != NULL);
1460
1461                                 if (!tdb_exists(ltdb, key)) {
1462                                         dbwrap_store(newkeys, key, tdb_null, 0);
1463                                 }
1464                         }
1465                         status = dbwrap_traverse(newkeys,
1466                                                  traverse_persistent_callback_dbwrap,
1467                                                  &state,
1468                                                  &count);
1469                         talloc_free(newkeys);
1470                         if (!NT_STATUS_IS_OK(status)) {
1471                                 return -1;
1472                         }
1473                         ret += count;
1474                 }
1475                 return ret;
1476         }
1477
1478         status = ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1479         if (!NT_STATUS_IS_OK(status)) {
1480                 return -1;
1481         }
1482         return state.count;
1483 }
1484
1485 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1486 {
1487         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1488 }
1489
1490 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1491 {
1492         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1493 }
1494
1495 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1496 {
1497         struct traverse_state *state = (struct traverse_state *)private_data;
1498         struct db_record rec;
1499         rec.key = key;
1500         rec.value = data;
1501         rec.store = db_ctdb_store_deny;
1502         rec.delete_rec = db_ctdb_delete_deny;
1503         rec.private_data = state->db;
1504         state->fn(&rec, state->private_data);
1505         state->count++;
1506 }
1507
1508 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1509                                         void *private_data)
1510 {
1511         struct traverse_state *state = (struct traverse_state *)private_data;
1512         struct db_record rec;
1513
1514         /*
1515          * Skip the __db_sequence_number__ key:
1516          * This is used for persistent transactions internally.
1517          */
1518         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1519             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1520         {
1521                 return 0;
1522         }
1523
1524         rec.key = kbuf;
1525         rec.value = dbuf;
1526         rec.store = db_ctdb_store_deny;
1527         rec.delete_rec = db_ctdb_delete_deny;
1528         rec.private_data = state->db;
1529
1530         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1531                 /* a deleted record */
1532                 return 0;
1533         }
1534         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1535         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1536
1537         state->count++;
1538         return state->fn(&rec, state->private_data);
1539 }
1540
1541 static int db_ctdb_traverse_read(struct db_context *db,
1542                                  int (*fn)(struct db_record *rec,
1543                                            void *private_data),
1544                                  void *private_data)
1545 {
1546         NTSTATUS status;
1547         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1548                                                         struct db_ctdb_ctx);
1549         struct traverse_state state;
1550
1551         state.db = db;
1552         state.fn = fn;
1553         state.private_data = private_data;
1554         state.count = 0;
1555
1556         if (db->persistent) {
1557                 /* for persistent databases we don't need to do a ctdb traverse,
1558                    we can do a faster local traverse */
1559                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1560         }
1561
1562         status = ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1563         if (!NT_STATUS_IS_OK(status)) {
1564                 return -1;
1565         }
1566         return state.count;
1567 }
1568
1569 static int db_ctdb_get_seqnum(struct db_context *db)
1570 {
1571         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1572                                                         struct db_ctdb_ctx);
1573         return tdb_get_seqnum(ctx->wtdb->tdb);
1574 }
1575
1576 static void db_ctdb_id(struct db_context *db, const uint8_t **id,
1577                        size_t *idlen)
1578 {
1579         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1580                 db->private_data, struct db_ctdb_ctx);
1581
1582         *id = (uint8_t *)&ctx->db_id;
1583         *idlen = sizeof(ctx->db_id);
1584 }
1585
1586 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1587                                 const char *name,
1588                                 int hash_size, int tdb_flags,
1589                                 int open_flags, mode_t mode,
1590                                 enum dbwrap_lock_order lock_order)
1591 {
1592         struct db_context *result;
1593         struct db_ctdb_ctx *db_ctdb;
1594         char *db_path;
1595         struct ctdbd_connection *conn;
1596         struct loadparm_context *lp_ctx;
1597         struct ctdb_db_priority prio;
1598         NTSTATUS status;
1599         int cstatus;
1600
1601         if (!lp_clustering()) {
1602                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1603                 return NULL;
1604         }
1605
1606         if (!(result = talloc_zero(mem_ctx, struct db_context))) {
1607                 DEBUG(0, ("talloc failed\n"));
1608                 TALLOC_FREE(result);
1609                 return NULL;
1610         }
1611
1612         if (!(db_ctdb = talloc(result, struct db_ctdb_ctx))) {
1613                 DEBUG(0, ("talloc failed\n"));
1614                 TALLOC_FREE(result);
1615                 return NULL;
1616         }
1617
1618         db_ctdb->transaction = NULL;
1619         db_ctdb->db = result;
1620
1621         conn = messaging_ctdbd_connection();
1622         if (conn == NULL) {
1623                 DEBUG(1, ("Could not connect to ctdb\n"));
1624                 TALLOC_FREE(result);
1625                 return NULL;
1626         }
1627
1628         if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn, name, &db_ctdb->db_id, tdb_flags))) {
1629                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1630                 TALLOC_FREE(result);
1631                 return NULL;
1632         }
1633
1634         db_path = ctdbd_dbpath(conn, db_ctdb, db_ctdb->db_id);
1635
1636         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1637         result->lock_order = lock_order;
1638
1639         /* only pass through specific flags */
1640         tdb_flags &= TDB_SEQNUM;
1641
1642         /* honor permissions if user has specified O_CREAT */
1643         if (open_flags & O_CREAT) {
1644                 chmod(db_path, mode);
1645         }
1646
1647         prio.db_id = db_ctdb->db_id;
1648         prio.priority = lock_order;
1649
1650         status = ctdbd_control_local(
1651                 conn, CTDB_CONTROL_SET_DB_PRIORITY, 0, 0,
1652                 make_tdb_data((uint8_t *)&prio, sizeof(prio)),
1653                 NULL, NULL, &cstatus);
1654
1655         if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1656                 DEBUG(1, ("CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d\n",
1657                           nt_errstr(status), cstatus));
1658                 TALLOC_FREE(result);
1659                 return NULL;
1660         }
1661
1662         lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
1663
1664         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags,
1665                                       O_RDWR, 0, lp_ctx);
1666         talloc_unlink(db_path, lp_ctx);
1667         if (db_ctdb->wtdb == NULL) {
1668                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1669                 TALLOC_FREE(result);
1670                 return NULL;
1671         }
1672         talloc_free(db_path);
1673
1674         if (result->persistent) {
1675                 db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb,
1676                                                     ctdb_conn_msg_ctx(conn));
1677                 if (db_ctdb->lock_ctx == NULL) {
1678                         DEBUG(0, ("g_lock_ctx_init failed\n"));
1679                         TALLOC_FREE(result);
1680                         return NULL;
1681                 }
1682         }
1683
1684         result->private_data = (void *)db_ctdb;
1685         result->fetch_locked = db_ctdb_fetch_locked;
1686         result->try_fetch_locked = db_ctdb_try_fetch_locked;
1687         result->parse_record = db_ctdb_parse_record;
1688         result->traverse = db_ctdb_traverse;
1689         result->traverse_read = db_ctdb_traverse_read;
1690         result->get_seqnum = db_ctdb_get_seqnum;
1691         result->transaction_start = db_ctdb_transaction_start;
1692         result->transaction_commit = db_ctdb_transaction_commit;
1693         result->transaction_cancel = db_ctdb_transaction_cancel;
1694         result->id = db_ctdb_id;
1695         result->stored_callback = NULL;
1696
1697         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1698                  name, db_ctdb->db_id));
1699
1700         return result;
1701 }
1702
1703 #else /* CLUSTER_SUPPORT */
1704
1705 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1706                                 const char *name,
1707                                 int hash_size, int tdb_flags,
1708                                 int open_flags, mode_t mode,
1709                                 enum dbwrap_lock_order lock_order)
1710 {
1711         DEBUG(3, ("db_open_ctdb: no cluster support!\n"));
1712         errno = ENOSYS;
1713         return NULL;
1714 }
1715
1716 #endif