s3: Factor out parse_newest_in_marshall_buffer from pull_newest_from_marshall_buffer
[kai/samba.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
1 /*
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007-2009
5    Copyright (C) Michael Adam 2009
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
24 #include "util_tdb.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
29
30 #ifdef CLUSTER_SUPPORT
31
32 /*
33  * It is not possible to include ctdb.h and tdb_compat.h (included via
34  * some other include above) without warnings. This fixes those
35  * warnings.
36  */
37
38 #ifdef typesafe_cb
39 #undef typesafe_cb
40 #endif
41
42 #ifdef typesafe_cb_preargs
43 #undef typesafe_cb_preargs
44 #endif
45
46 #ifdef typesafe_cb_postargs
47 #undef typesafe_cb_postargs
48 #endif
49
50 #include "ctdb.h"
51 #include "ctdb_private.h"
52 #include "ctdbd_conn.h"
53 #include "dbwrap/dbwrap.h"
54 #include "dbwrap/dbwrap_private.h"
55 #include "dbwrap/dbwrap_ctdb.h"
56 #include "g_lock.h"
57 #include "messages.h"
58
59 struct db_ctdb_transaction_handle {
60         struct db_ctdb_ctx *ctx;
61         /*
62          * we store the writes done under a transaction:
63          */
64         struct ctdb_marshall_buffer *m_write;
65         uint32_t nesting;
66         bool nested_cancel;
67         char *lock_name;
68 };
69
70 struct db_ctdb_ctx {
71         struct db_context *db;
72         struct tdb_wrap *wtdb;
73         uint32_t db_id;
74         struct db_ctdb_transaction_handle *transaction;
75         struct g_lock_ctx *lock_ctx;
76 };
77
78 struct db_ctdb_rec {
79         struct db_ctdb_ctx *ctdb_ctx;
80         struct ctdb_ltdb_header header;
81         struct timeval lock_time;
82 };
83
84 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
85 {
86         enum TDB_ERROR tret = tdb_error(tdb);
87
88         return map_nt_error_from_tdb(tret);
89 }
90
91 struct db_ctdb_ltdb_parse_state {
92         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
93                        TDB_DATA data, void *private_data);
94         void *private_data;
95 };
96
97 static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
98                                void *private_data)
99 {
100         struct db_ctdb_ltdb_parse_state *state =
101                 (struct db_ctdb_ltdb_parse_state *)private_data;
102
103         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
104                 return -1;
105         }
106         state->parser(
107                 key, (struct ctdb_ltdb_header *)data.dptr,
108                 make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
109                               data.dsize - sizeof(struct ctdb_ltdb_header)),
110                 state->private_data);
111         return 0;
112 }
113
114 static NTSTATUS db_ctdb_ltdb_parse(
115         struct db_ctdb_ctx *db, TDB_DATA key,
116         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
117                        TDB_DATA data, void *private_data),
118         void *private_data)
119 {
120         struct db_ctdb_ltdb_parse_state state;
121         int ret;
122
123         state.parser = parser;
124         state.private_data = private_data;
125
126         ret = tdb_parse_record(db->wtdb->tdb, key, db_ctdb_ltdb_parser,
127                                &state);
128         if (ret == -1) {
129                 return NT_STATUS_NOT_FOUND;
130         }
131         return NT_STATUS_OK;
132 }
133
134 struct db_ctdb_ltdb_fetch_state {
135         struct ctdb_ltdb_header *header;
136         TALLOC_CTX *mem_ctx;
137         TDB_DATA *data;
138         bool oom;
139 };
140
141 static void db_ctdb_ltdb_fetch_parser(
142         TDB_DATA key, struct ctdb_ltdb_header *header,
143         TDB_DATA data, void *private_data)
144 {
145         struct db_ctdb_ltdb_fetch_state *state =
146                 (struct db_ctdb_ltdb_fetch_state *)private_data;
147
148         if (state->header != NULL) {
149                 memcpy(state->header, header, sizeof(struct ctdb_ltdb_header));
150         }
151         if (state->data == NULL) {
152                 return;
153         }
154         state->data->dsize = data.dsize;
155         if (data.dsize == 0) {
156                 state->data->dptr = NULL;
157                 return;
158         }
159         state->data->dptr = talloc_memdup(state->mem_ctx, data.dptr,
160                                           data.dsize);
161         if (state->data->dptr == NULL) {
162                 state->oom = true;
163                 return;
164         }
165 }
166
167 /**
168  * fetch a record from the tdb, separating out the header
169  * information and returning the body of the record.
170  */
171 static NTSTATUS db_ctdb_ltdb_fetch(struct db_ctdb_ctx *db,
172                                    TDB_DATA key,
173                                    struct ctdb_ltdb_header *header,
174                                    TALLOC_CTX *mem_ctx,
175                                    TDB_DATA *data)
176 {
177         struct db_ctdb_ltdb_fetch_state state;
178         NTSTATUS status;
179
180         state.header = header;
181         state.mem_ctx = mem_ctx;
182         state.data = data;
183         state.oom = false;
184
185         status = db_ctdb_ltdb_parse(db, key, db_ctdb_ltdb_fetch_parser,
186                                     &state);
187         if (!NT_STATUS_IS_OK(status)) {
188                 if (data) {
189                         ZERO_STRUCTP(data);
190                 }
191                 if (header) {
192                         header->dmaster = (uint32_t)-1;
193                         header->rsn = 0;
194                 }
195                 return status;
196         }
197         if (state.oom) {
198                 return NT_STATUS_NO_MEMORY;
199         }
200         return NT_STATUS_OK;
201 }
202
203 /*
204  * Store a record together with the ctdb record header
205  * in the local copy of the database.
206  */
207 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
208                                    TDB_DATA key,
209                                    struct ctdb_ltdb_header *header,
210                                    TDB_DATA data)
211 {
212         TALLOC_CTX *tmp_ctx = talloc_stackframe();
213         TDB_DATA rec;
214         int ret;
215
216         rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
217         rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
218
219         if (rec.dptr == NULL) {
220                 talloc_free(tmp_ctx);
221                 return NT_STATUS_NO_MEMORY;
222         }
223
224         memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
225         memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
226
227         ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
228
229         talloc_free(tmp_ctx);
230
231         return (ret == 0) ? NT_STATUS_OK
232                           : tdb_error_to_ntstatus(db->wtdb->tdb);
233
234 }
235
236 /*
237   form a ctdb_rec_data record from a key/data pair
238  */
239 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
240                                                   TDB_DATA key,
241                                                   struct ctdb_ltdb_header *header,
242                                                   TDB_DATA data)
243 {
244         size_t length;
245         struct ctdb_rec_data *d;
246
247         length = offsetof(struct ctdb_rec_data, data) + key.dsize +
248                 data.dsize + sizeof(*header);
249         d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
250         if (d == NULL) {
251                 return NULL;
252         }
253         d->length = length;
254         d->reqid = reqid;
255         d->keylen = key.dsize;
256         memcpy(&d->data[0], key.dptr, key.dsize);
257
258         d->datalen = data.dsize + sizeof(*header);
259         memcpy(&d->data[key.dsize], header, sizeof(*header));
260         memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
261         return d;
262 }
263
264
265 /* helper function for marshalling multiple records */
266 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
267                                                struct ctdb_marshall_buffer *m,
268                                                uint64_t db_id,
269                                                uint32_t reqid,
270                                                TDB_DATA key,
271                                                struct ctdb_ltdb_header *header,
272                                                TDB_DATA data)
273 {
274         struct ctdb_rec_data *r;
275         size_t m_size, r_size;
276         struct ctdb_marshall_buffer *m2 = NULL;
277
278         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
279         if (r == NULL) {
280                 talloc_free(m);
281                 return NULL;
282         }
283
284         if (m == NULL) {
285                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
286                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
287                 if (m == NULL) {
288                         goto done;
289                 }
290                 m->db_id = db_id;
291         }
292
293         m_size = talloc_get_size(m);
294         r_size = talloc_get_size(r);
295
296         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
297                 mem_ctx, m,  m_size + r_size);
298         if (m2 == NULL) {
299                 talloc_free(m);
300                 goto done;
301         }
302
303         memcpy(m_size + (uint8_t *)m2, r, r_size);
304
305         m2->count++;
306
307 done:
308         talloc_free(r);
309         return m2;
310 }
311
312 /* we've finished marshalling, return a data blob with the marshalled records */
313 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
314 {
315         TDB_DATA data;
316         data.dptr = (uint8_t *)m;
317         data.dsize = talloc_get_size(m);
318         return data;
319 }
320
321 /*
322    loop over a marshalling buffer
323
324      - pass r==NULL to start
325      - loop the number of times indicated by m->count
326 */
327 static struct ctdb_rec_data *db_ctdb_marshall_loop_next_key(
328         struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r, TDB_DATA *key)
329 {
330         if (r == NULL) {
331                 r = (struct ctdb_rec_data *)&m->data[0];
332         } else {
333                 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
334         }
335
336         key->dptr   = &r->data[0];
337         key->dsize  = r->keylen;
338         return r;
339 }
340
341 static bool db_ctdb_marshall_buf_parse(
342         struct ctdb_rec_data *r, uint32_t *reqid,
343         struct ctdb_ltdb_header **header, TDB_DATA *data)
344 {
345         if (r->datalen < sizeof(struct ctdb_ltdb_header)) {
346                 return false;
347         }
348
349         *reqid = r->reqid;
350
351         data->dptr  = &r->data[r->keylen] + sizeof(struct ctdb_ltdb_header);
352         data->dsize = r->datalen - sizeof(struct ctdb_ltdb_header);
353
354         *header = (struct ctdb_ltdb_header *)&r->data[r->keylen];
355
356         return true;
357 }
358
359 /**
360  * CTDB transaction destructor
361  */
362 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
363 {
364         NTSTATUS status;
365
366         status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
367         if (!NT_STATUS_IS_OK(status)) {
368                 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
369                           nt_errstr(status)));
370                 return -1;
371         }
372         return 0;
373 }
374
375 /**
376  * CTDB dbwrap API: transaction_start function
377  * starts a transaction on a persistent database
378  */
379 static int db_ctdb_transaction_start(struct db_context *db)
380 {
381         struct db_ctdb_transaction_handle *h;
382         NTSTATUS status;
383         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
384                                                         struct db_ctdb_ctx);
385
386         if (!db->persistent) {
387                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
388                          ctx->db_id));
389                 return -1;
390         }
391
392         if (ctx->transaction) {
393                 ctx->transaction->nesting++;
394                 DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
395                           ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
396                 return 0;
397         }
398
399         h = talloc_zero(db, struct db_ctdb_transaction_handle);
400         if (h == NULL) {
401                 DEBUG(0,(__location__ " oom for transaction handle\n"));
402                 return -1;
403         }
404
405         h->ctx = ctx;
406
407         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
408                                        (unsigned int)ctx->db_id);
409         if (h->lock_name == NULL) {
410                 DEBUG(0, ("talloc_asprintf failed\n"));
411                 TALLOC_FREE(h);
412                 return -1;
413         }
414
415         /*
416          * Wait a day, i.e. forever...
417          */
418         status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE,
419                              timeval_set(86400, 0));
420         if (!NT_STATUS_IS_OK(status)) {
421                 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status)));
422                 TALLOC_FREE(h);
423                 return -1;
424         }
425
426         talloc_set_destructor(h, db_ctdb_transaction_destructor);
427
428         ctx->transaction = h;
429
430         DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
431
432         return 0;
433 }
434
435 static bool parse_newest_in_marshall_buffer(
436         struct ctdb_marshall_buffer *buf, TDB_DATA key,
437         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
438                        TDB_DATA data, void *private_data),
439         void *private_data)
440 {
441         struct ctdb_rec_data *rec = NULL;
442         struct ctdb_ltdb_header *h = NULL;
443         TDB_DATA data;
444         int i;
445
446         if (buf == NULL) {
447                 return false;
448         }
449
450         /*
451          * Walk the list of records written during this
452          * transaction. If we want to read one we have already
453          * written, return the last written sample. Thus we do not do
454          * a "break;" for the first hit, this record might have been
455          * overwritten later.
456          */
457
458         for (i=0; i<buf->count; i++) {
459                 TDB_DATA tkey;
460                 uint32_t reqid;
461
462                 rec = db_ctdb_marshall_loop_next_key(buf, rec, &tkey);
463                 if (rec == NULL) {
464                         return false;
465                 }
466
467                 if (!tdb_data_equal(key, tkey)) {
468                         continue;
469                 }
470
471                 if (!db_ctdb_marshall_buf_parse(rec, &reqid, &h, &data)) {
472                         return false;
473                 }
474         }
475
476         if (h == NULL) {
477                 return false;
478         }
479
480         parser(key, h, data, private_data);
481
482         return true;
483 }
484
485 struct pull_newest_from_marshall_buffer_state {
486         struct ctdb_ltdb_header *pheader;
487         TALLOC_CTX *mem_ctx;
488         TDB_DATA *pdata;
489 };
490
491 static void pull_newest_from_marshall_buffer_parser(
492         TDB_DATA key, struct ctdb_ltdb_header *header,
493         TDB_DATA data, void *private_data)
494 {
495         struct pull_newest_from_marshall_buffer_state *state =
496                 (struct pull_newest_from_marshall_buffer_state *)private_data;
497
498         if (state->pheader != NULL) {
499                 memcpy(state->pheader, header, sizeof(*state->pheader));
500         }
501         if (state->pdata != NULL) {
502                 state->pdata->dsize = data.dsize;
503                 state->pdata->dptr = (uint8_t *)talloc_memdup(
504                         state->mem_ctx, data.dptr, data.dsize);
505         }
506 }
507
508 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
509                                              TDB_DATA key,
510                                              struct ctdb_ltdb_header *pheader,
511                                              TALLOC_CTX *mem_ctx,
512                                              TDB_DATA *pdata)
513 {
514         struct pull_newest_from_marshall_buffer_state state;
515
516         state.pheader = pheader;
517         state.mem_ctx = mem_ctx;
518         state.pdata = pdata;
519
520         if (!parse_newest_in_marshall_buffer(
521                     buf, key, pull_newest_from_marshall_buffer_parser,
522                     &state)) {
523                 return false;
524         }
525         if ((pdata != NULL) && (pdata->dsize != 0) && (pdata->dptr == NULL)) {
526                 /* ENOMEM */
527                 return false;
528         }
529         return true;
530 }
531
532 /*
533   fetch a record inside a transaction
534  */
535 static NTSTATUS db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
536                                           TALLOC_CTX *mem_ctx,
537                                           TDB_DATA key, TDB_DATA *data)
538 {
539         struct db_ctdb_transaction_handle *h = db->transaction;
540         NTSTATUS status;
541         bool found;
542
543         found = pull_newest_from_marshall_buffer(h->m_write, key, NULL,
544                                                  mem_ctx, data);
545         if (found) {
546                 return NT_STATUS_OK;
547         }
548
549         status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
550
551         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
552                 *data = tdb_null;
553         }
554
555         return status;
556 }
557
558 /**
559  * Fetch a record from a persistent database
560  * without record locking and without an active transaction.
561  *
562  * This just fetches from the local database copy.
563  * Since the databases are kept in syc cluster-wide,
564  * there is no point in doing a ctdb call to fetch the
565  * record from the lmaster. It does even harm since migration
566  * of records bump their RSN and hence render the persistent
567  * database inconsistent.
568  */
569 static NTSTATUS db_ctdb_fetch_persistent(struct db_ctdb_ctx *db,
570                                          TALLOC_CTX *mem_ctx,
571                                          TDB_DATA key, TDB_DATA *data)
572 {
573         NTSTATUS status;
574
575         status = db_ctdb_ltdb_fetch(db, key, NULL, mem_ctx, data);
576
577         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
578                 *data = tdb_null;
579         }
580
581         return status;
582 }
583
584 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
585 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
586
587 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
588                                                           TALLOC_CTX *mem_ctx,
589                                                           TDB_DATA key)
590 {
591         struct db_record *result;
592         TDB_DATA ctdb_data;
593
594         if (!(result = talloc(mem_ctx, struct db_record))) {
595                 DEBUG(0, ("talloc failed\n"));
596                 return NULL;
597         }
598
599         result->private_data = ctx->transaction;
600
601         result->key.dsize = key.dsize;
602         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
603                                                     key.dsize);
604         if (result->key.dptr == NULL) {
605                 DEBUG(0, ("talloc failed\n"));
606                 TALLOC_FREE(result);
607                 return NULL;
608         }
609
610         result->store = db_ctdb_store_transaction;
611         result->delete_rec = db_ctdb_delete_transaction;
612
613         if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key,
614                                              NULL, result, &result->value)) {
615                 return result;
616         }
617
618         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
619         if (ctdb_data.dptr == NULL) {
620                 /* create the record */
621                 result->value = tdb_null;
622                 return result;
623         }
624
625         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
626         result->value.dptr = NULL;
627
628         if ((result->value.dsize != 0)
629             && !(result->value.dptr = (uint8_t *)talloc_memdup(
630                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
631                          result->value.dsize))) {
632                 DEBUG(0, ("talloc failed\n"));
633                 TALLOC_FREE(result);
634         }
635
636         SAFE_FREE(ctdb_data.dptr);
637
638         return result;
639 }
640
641 static int db_ctdb_record_destructor(struct db_record **recp)
642 {
643         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
644         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
645                 rec->private_data, struct db_ctdb_transaction_handle);
646         int ret = h->ctx->db->transaction_commit(h->ctx->db);
647         if (ret != 0) {
648                 DEBUG(0,(__location__ " transaction_commit failed\n"));
649         }
650         return 0;
651 }
652
653 /*
654   auto-create a transaction for persistent databases
655  */
656 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
657                                                          TALLOC_CTX *mem_ctx,
658                                                          TDB_DATA key)
659 {
660         int res;
661         struct db_record *rec, **recp;
662
663         res = db_ctdb_transaction_start(ctx->db);
664         if (res == -1) {
665                 return NULL;
666         }
667
668         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
669         if (rec == NULL) {
670                 ctx->db->transaction_cancel(ctx->db);
671                 return NULL;
672         }
673
674         /* destroy this transaction when we release the lock */
675         recp = talloc(rec, struct db_record *);
676         if (recp == NULL) {
677                 ctx->db->transaction_cancel(ctx->db);
678                 talloc_free(rec);
679                 return NULL;
680         }
681         *recp = rec;
682         talloc_set_destructor(recp, db_ctdb_record_destructor);
683         return rec;
684 }
685
686
687 /*
688   stores a record inside a transaction
689  */
690 static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
691                                           TDB_DATA key, TDB_DATA data)
692 {
693         TALLOC_CTX *tmp_ctx = talloc_new(h);
694         TDB_DATA rec;
695         struct ctdb_ltdb_header header;
696
697         ZERO_STRUCT(header);
698
699         /* we need the header so we can update the RSN */
700
701         if (!pull_newest_from_marshall_buffer(h->m_write, key, &header,
702                                               NULL, NULL)) {
703
704                 rec = tdb_fetch_compat(h->ctx->wtdb->tdb, key);
705
706                 if (rec.dptr != NULL) {
707                         memcpy(&header, rec.dptr,
708                                sizeof(struct ctdb_ltdb_header));
709                         rec.dsize -= sizeof(struct ctdb_ltdb_header);
710
711                         /*
712                          * a special case, we are writing the same
713                          * data that is there now
714                          */
715                         if (data.dsize == rec.dsize &&
716                             memcmp(data.dptr,
717                                    rec.dptr + sizeof(struct ctdb_ltdb_header),
718                                    data.dsize) == 0) {
719                                 SAFE_FREE(rec.dptr);
720                                 talloc_free(tmp_ctx);
721                                 return NT_STATUS_OK;
722                         }
723                 }
724                 SAFE_FREE(rec.dptr);
725         }
726
727         header.dmaster = get_my_vnn();
728         header.rsn++;
729
730         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
731         if (h->m_write == NULL) {
732                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
733                 talloc_free(tmp_ctx);
734                 return NT_STATUS_NO_MEMORY;
735         }
736
737         talloc_free(tmp_ctx);
738         return NT_STATUS_OK;
739 }
740
741
742 /* 
743    a record store inside a transaction
744  */
745 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
746 {
747         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
748                 rec->private_data, struct db_ctdb_transaction_handle);
749         NTSTATUS status;
750
751         status = db_ctdb_transaction_store(h, rec->key, data);
752         return status;
753 }
754
755 /*
756    a record delete inside a transaction
757  */
758 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
759 {
760         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
761                 rec->private_data, struct db_ctdb_transaction_handle);
762         NTSTATUS status;
763
764         status =  db_ctdb_transaction_store(h, rec->key, tdb_null);
765         return status;
766 }
767
768 static void db_ctdb_fetch_db_seqnum_parser(
769         TDB_DATA key, struct ctdb_ltdb_header *header,
770         TDB_DATA data, void *private_data)
771 {
772         uint64_t *seqnum = (uint64_t *)private_data;
773
774         if (data.dsize != sizeof(uint64_t)) {
775                 *seqnum = 0;
776                 return;
777         }
778         memcpy(seqnum, data.dptr, sizeof(*seqnum));
779 }
780
781 /**
782  * Fetch the db sequence number of a persistent db directly from the db.
783  */
784 static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
785                                                 uint64_t *seqnum)
786 {
787         NTSTATUS status;
788         TDB_DATA key;
789
790         if (seqnum == NULL) {
791                 return NT_STATUS_INVALID_PARAMETER;
792         }
793
794         key = string_term_tdb_data(CTDB_DB_SEQNUM_KEY);
795
796         status = db_ctdb_ltdb_parse(
797                 db, key, db_ctdb_fetch_db_seqnum_parser, seqnum);
798
799         if (NT_STATUS_IS_OK(status)) {
800                 return NT_STATUS_OK;
801         }
802         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
803                 *seqnum = 0;
804                 return NT_STATUS_OK;
805         }
806         return status;
807 }
808
809 /**
810  * Store the database sequence number inside a transaction.
811  */
812 static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h,
813                                         uint64_t seqnum)
814 {
815         NTSTATUS status;
816         const char *keyname = CTDB_DB_SEQNUM_KEY;
817         TDB_DATA key;
818         TDB_DATA data;
819
820         key = string_term_tdb_data(keyname);
821
822         data.dptr = (uint8_t *)&seqnum;
823         data.dsize = sizeof(uint64_t);
824
825         status = db_ctdb_transaction_store(h, key, data);
826
827         return status;
828 }
829
830 /*
831   commit a transaction
832  */
833 static int db_ctdb_transaction_commit(struct db_context *db)
834 {
835         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
836                                                         struct db_ctdb_ctx);
837         NTSTATUS rets;
838         int status;
839         struct db_ctdb_transaction_handle *h = ctx->transaction;
840         uint64_t old_seqnum, new_seqnum;
841         int ret;
842
843         if (h == NULL) {
844                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
845                 return -1;
846         }
847
848         if (h->nested_cancel) {
849                 db->transaction_cancel(db);
850                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
851                 return -1;
852         }
853
854         if (h->nesting != 0) {
855                 h->nesting--;
856                 DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
857                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
858                 return 0;
859         }
860
861         if (h->m_write == NULL) {
862                 /*
863                  * No changes were made, so don't change the seqnum,
864                  * don't push to other node, just exit with success.
865                  */
866                 ret = 0;
867                 goto done;
868         }
869
870         DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
871
872         /*
873          * As the last db action before committing, bump the database sequence
874          * number. Note that this undoes all changes to the seqnum records
875          * performed under the transaction. This record is not meant to be
876          * modified by user interaction. It is for internal use only...
877          */
878         rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum);
879         if (!NT_STATUS_IS_OK(rets)) {
880                 DEBUG(1, (__location__ " failed to fetch the db sequence number "
881                           "in transaction commit on db 0x%08x\n", ctx->db_id));
882                 ret = -1;
883                 goto done;
884         }
885
886         new_seqnum = old_seqnum + 1;
887
888         rets = db_ctdb_store_db_seqnum(h, new_seqnum);
889         if (!NT_STATUS_IS_OK(rets)) {
890                 DEBUG(1, (__location__ "failed to store the db sequence number "
891                           " in transaction commit on db 0x%08x\n", ctx->db_id));
892                 ret = -1;
893                 goto done;
894         }
895
896 again:
897         /* tell ctdbd to commit to the other nodes */
898         rets = ctdbd_control_local(messaging_ctdbd_connection(),
899                                    CTDB_CONTROL_TRANS3_COMMIT,
900                                    h->ctx->db_id, 0,
901                                    db_ctdb_marshall_finish(h->m_write),
902                                    NULL, NULL, &status);
903         if (!NT_STATUS_IS_OK(rets) || status != 0) {
904                 /*
905                  * The TRANS3_COMMIT control should only possibly fail when a
906                  * recovery has been running concurrently. In any case, the db
907                  * will be the same on all nodes, either the new copy or the
908                  * old copy.  This can be detected by comparing the old and new
909                  * local sequence numbers.
910                  */
911                 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum);
912                 if (!NT_STATUS_IS_OK(rets)) {
913                         DEBUG(1, (__location__ " failed to refetch db sequence "
914                                   "number after failed TRANS3_COMMIT\n"));
915                         ret = -1;
916                         goto done;
917                 }
918
919                 if (new_seqnum == old_seqnum) {
920                         /* Recovery prevented all our changes: retry. */
921                         goto again;
922                 }
923                 if (new_seqnum != (old_seqnum + 1)) {
924                         DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
925                                   "old_seqnum[%lu] + (0 or 1) after failed "
926                                   "TRANS3_COMMIT - this should not happen!\n",
927                                   (unsigned long)new_seqnum,
928                                   (unsigned long)old_seqnum));
929                         ret = -1;
930                         goto done;
931                 }
932                 /*
933                  * Recovery propagated our changes to all nodes, completing
934                  * our commit for us - succeed.
935                  */
936         }
937
938         ret = 0;
939
940 done:
941         h->ctx->transaction = NULL;
942         talloc_free(h);
943         return ret;
944 }
945
946
947 /*
948   cancel a transaction
949  */
950 static int db_ctdb_transaction_cancel(struct db_context *db)
951 {
952         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
953                                                         struct db_ctdb_ctx);
954         struct db_ctdb_transaction_handle *h = ctx->transaction;
955
956         if (h == NULL) {
957                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
958                 return -1;
959         }
960
961         if (h->nesting != 0) {
962                 h->nesting--;
963                 h->nested_cancel = true;
964                 DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
965                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
966                 return 0;
967         }
968
969         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
970
971         ctx->transaction = NULL;
972         talloc_free(h);
973         return 0;
974 }
975
976
977 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
978 {
979         struct db_ctdb_rec *crec = talloc_get_type_abort(
980                 rec->private_data, struct db_ctdb_rec);
981
982         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
983 }
984
985
986
987 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
988 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
989 {
990         NTSTATUS status;
991         struct ctdb_control_schedule_for_deletion *dd;
992         TDB_DATA indata;
993         int cstatus;
994         struct db_ctdb_rec *crec = talloc_get_type_abort(
995                 rec->private_data, struct db_ctdb_rec);
996
997         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + rec->key.dsize;
998         indata.dptr = talloc_zero_array(crec, uint8_t, indata.dsize);
999         if (indata.dptr == NULL) {
1000                 DEBUG(0, (__location__ " talloc failed!\n"));
1001                 return NT_STATUS_NO_MEMORY;
1002         }
1003
1004         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
1005         dd->db_id = crec->ctdb_ctx->db_id;
1006         dd->hdr = crec->header;
1007         dd->keylen = rec->key.dsize;
1008         memcpy(dd->key, rec->key.dptr, rec->key.dsize);
1009
1010         status = ctdbd_control_local(messaging_ctdbd_connection(),
1011                                      CTDB_CONTROL_SCHEDULE_FOR_DELETION,
1012                                      crec->ctdb_ctx->db_id,
1013                                      CTDB_CTRL_FLAG_NOREPLY, /* flags */
1014                                      indata,
1015                                      NULL, /* outdata */
1016                                      NULL, /* errmsg */
1017                                      &cstatus);
1018         talloc_free(indata.dptr);
1019
1020         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
1021                 DEBUG(1, (__location__ " Error sending local control "
1022                           "SCHEDULE_FOR_DELETION: %s, cstatus = %d\n",
1023                           nt_errstr(status), cstatus));
1024                 if (NT_STATUS_IS_OK(status)) {
1025                         status = NT_STATUS_UNSUCCESSFUL;
1026                 }
1027         }
1028
1029         return status;
1030 }
1031 #endif
1032
1033 static NTSTATUS db_ctdb_delete(struct db_record *rec)
1034 {
1035         TDB_DATA data;
1036         NTSTATUS status;
1037
1038         /*
1039          * We have to store the header with empty data. TODO: Fix the
1040          * tdb-level cleanup
1041          */
1042
1043         ZERO_STRUCT(data);
1044
1045         status = db_ctdb_store(rec, data, 0);
1046         if (!NT_STATUS_IS_OK(status)) {
1047                 return status;
1048         }
1049
1050 #ifdef HAVE_CTDB_CONTROL_SCHEDULE_FOR_DELETION_DECL
1051         status = db_ctdb_send_schedule_for_deletion(rec);
1052 #endif
1053
1054         return status;
1055 }
1056
1057 static int db_ctdb_record_destr(struct db_record* data)
1058 {
1059         struct db_ctdb_rec *crec = talloc_get_type_abort(
1060                 data->private_data, struct db_ctdb_rec);
1061         int threshold;
1062
1063         DEBUG(10, (DEBUGLEVEL > 10
1064                    ? "Unlocking db %u key %s\n"
1065                    : "Unlocking db %u key %.20s\n",
1066                    (int)crec->ctdb_ctx->db_id,
1067                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
1068                               data->key.dsize)));
1069
1070         tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
1071
1072         threshold = lp_ctdb_locktime_warn_threshold();
1073         if (threshold != 0) {
1074                 double timediff = timeval_elapsed(&crec->lock_time);
1075                 if ((timediff * 1000) > threshold) {
1076                         const char *key;
1077
1078                         key = hex_encode_talloc(data,
1079                                                 (unsigned char *)data->key.dptr,
1080                                                 data->key.dsize);
1081                         DEBUG(0, ("Held tdb lock on db %s, key %s %f seconds\n",
1082                                   tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
1083                                   timediff));
1084                 }
1085         }
1086
1087         return 0;
1088 }
1089
1090 /**
1091  * Check whether we have a valid local copy of the given record,
1092  * either for reading or for writing.
1093  */
1094 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, bool read_only)
1095 {
1096         struct ctdb_ltdb_header *hdr;
1097
1098         if (ctdb_data.dptr == NULL)
1099                 return false;
1100
1101         if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header))
1102                 return false;
1103
1104         hdr = (struct ctdb_ltdb_header *)ctdb_data.dptr;
1105
1106 #ifdef HAVE_CTDB_WANT_READONLY_DECL
1107         if (hdr->dmaster != get_my_vnn()) {
1108                 /* If we're not dmaster, it must be r/o copy. */
1109                 return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
1110         }
1111
1112         /*
1113          * If we want write access, no one may have r/o copies.
1114          */
1115         return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
1116 #else
1117         return (hdr->dmaster == get_my_vnn());
1118 #endif
1119 }
1120
1121 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
1122                                                TALLOC_CTX *mem_ctx,
1123                                                TDB_DATA key,
1124                                                bool tryonly)
1125 {
1126         struct db_record *result;
1127         struct db_ctdb_rec *crec;
1128         NTSTATUS status;
1129         TDB_DATA ctdb_data;
1130         int migrate_attempts = 0;
1131         int lockret;
1132
1133         if (!(result = talloc(mem_ctx, struct db_record))) {
1134                 DEBUG(0, ("talloc failed\n"));
1135                 return NULL;
1136         }
1137
1138         if (!(crec = talloc_zero(result, struct db_ctdb_rec))) {
1139                 DEBUG(0, ("talloc failed\n"));
1140                 TALLOC_FREE(result);
1141                 return NULL;
1142         }
1143
1144         result->db = ctx->db;
1145         result->private_data = (void *)crec;
1146         crec->ctdb_ctx = ctx;
1147
1148         result->key.dsize = key.dsize;
1149         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
1150                                                     key.dsize);
1151         if (result->key.dptr == NULL) {
1152                 DEBUG(0, ("talloc failed\n"));
1153                 TALLOC_FREE(result);
1154                 return NULL;
1155         }
1156
1157         /*
1158          * Do a blocking lock on the record
1159          */
1160 again:
1161
1162         if (DEBUGLEVEL >= 10) {
1163                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1164                 DEBUG(10, (DEBUGLEVEL > 10
1165                            ? "Locking db %u key %s\n"
1166                            : "Locking db %u key %.20s\n",
1167                            (int)crec->ctdb_ctx->db_id, keystr));
1168                 TALLOC_FREE(keystr);
1169         }
1170
1171         lockret = tryonly
1172                 ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
1173                 : tdb_chainlock(ctx->wtdb->tdb, key);
1174         if (lockret != 0) {
1175                 DEBUG(3, ("tdb_chainlock failed\n"));
1176                 TALLOC_FREE(result);
1177                 return NULL;
1178         }
1179
1180         result->store = db_ctdb_store;
1181         result->delete_rec = db_ctdb_delete;
1182         talloc_set_destructor(result, db_ctdb_record_destr);
1183
1184         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
1185
1186         /*
1187          * See if we have a valid record and we are the dmaster. If so, we can
1188          * take the shortcut and just return it.
1189          */
1190
1191         if (!db_ctdb_can_use_local_copy(ctdb_data, false)) {
1192                 SAFE_FREE(ctdb_data.dptr);
1193                 tdb_chainunlock(ctx->wtdb->tdb, key);
1194                 talloc_set_destructor(result, NULL);
1195
1196                 if (tryonly && (migrate_attempts != 0)) {
1197                         DEBUG(5, ("record migrated away again\n"));
1198                         TALLOC_FREE(result);
1199                         return NULL;
1200                 }
1201
1202                 migrate_attempts += 1;
1203
1204                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u) %u\n",
1205                            ctdb_data.dptr, ctdb_data.dptr ?
1206                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
1207                            get_my_vnn(),
1208                            ctdb_data.dptr ?
1209                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
1210
1211                 status = ctdbd_migrate(messaging_ctdbd_connection(), ctx->db_id,
1212                                        key);
1213                 if (!NT_STATUS_IS_OK(status)) {
1214                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1215                                   nt_errstr(status)));
1216                         TALLOC_FREE(result);
1217                         return NULL;
1218                 }
1219                 /* now its migrated, try again */
1220                 goto again;
1221         }
1222
1223         if (migrate_attempts > 10) {
1224                 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s needed %d "
1225                           "attempts\n", tdb_name(ctx->wtdb->tdb),
1226                           hex_encode_talloc(talloc_tos(),
1227                                             (unsigned char *)key.dptr,
1228                                             key.dsize),
1229                           migrate_attempts));
1230         }
1231
1232         GetTimeOfDay(&crec->lock_time);
1233
1234         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1235
1236         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1237         result->value.dptr = NULL;
1238
1239         if ((result->value.dsize != 0)
1240             && !(result->value.dptr = (uint8_t *)talloc_memdup(
1241                          result, ctdb_data.dptr + sizeof(crec->header),
1242                          result->value.dsize))) {
1243                 DEBUG(0, ("talloc failed\n"));
1244                 TALLOC_FREE(result);
1245         }
1246
1247         SAFE_FREE(ctdb_data.dptr);
1248
1249         return result;
1250 }
1251
1252 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1253                                               TALLOC_CTX *mem_ctx,
1254                                               TDB_DATA key)
1255 {
1256         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1257                                                         struct db_ctdb_ctx);
1258
1259         if (ctx->transaction != NULL) {
1260                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1261         }
1262
1263         if (db->persistent) {
1264                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1265         }
1266
1267         return fetch_locked_internal(ctx, mem_ctx, key, false);
1268 }
1269
1270 static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
1271                                                   TALLOC_CTX *mem_ctx,
1272                                                   TDB_DATA key)
1273 {
1274         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1275                                                         struct db_ctdb_ctx);
1276
1277         if (ctx->transaction != NULL) {
1278                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1279         }
1280
1281         if (db->persistent) {
1282                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1283         }
1284
1285         return fetch_locked_internal(ctx, mem_ctx, key, true);
1286 }
1287
1288 /*
1289   fetch (unlocked, no migration) operation on ctdb
1290  */
1291 static NTSTATUS db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
1292                               TDB_DATA key, TDB_DATA *data)
1293 {
1294         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1295                                                         struct db_ctdb_ctx);
1296         NTSTATUS status;
1297         TDB_DATA ctdb_data;
1298
1299         if (ctx->transaction) {
1300                 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1301         }
1302
1303         if (db->persistent) {
1304                 return db_ctdb_fetch_persistent(ctx, mem_ctx, key, data);
1305         }
1306
1307         /* try a direct fetch */
1308         ctdb_data = tdb_fetch_compat(ctx->wtdb->tdb, key);
1309
1310         /*
1311          * See if we have a valid record and we are the dmaster. If so, we can
1312          * take the shortcut and just return it.
1313          * we bypass the dmaster check for persistent databases
1314          */
1315         if (db_ctdb_can_use_local_copy(ctdb_data, true)) {
1316                 /*
1317                  * We have a valid local copy - avoid the ctdb protocol op
1318                  */
1319                 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1320
1321                 data->dptr = (uint8_t *)talloc_memdup(
1322                         mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1323                         data->dsize);
1324
1325                 SAFE_FREE(ctdb_data.dptr);
1326
1327                 if (data->dptr == NULL) {
1328                         return NT_STATUS_NO_MEMORY;
1329                 }
1330                 return NT_STATUS_OK;
1331         }
1332
1333         SAFE_FREE(ctdb_data.dptr);
1334
1335         /*
1336          * We weren't able to get it locally - ask ctdb to fetch it for us.
1337          * If we already had *something*, it's probably worth making a local
1338          * read-only copy.
1339          */
1340         status = ctdbd_fetch(messaging_ctdbd_connection(), ctx->db_id, key,
1341                              mem_ctx, data,
1342                              ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header));
1343         if (!NT_STATUS_IS_OK(status)) {
1344                 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1345         }
1346
1347         return status;
1348 }
1349
1350 static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
1351                                      void (*parser)(TDB_DATA key,
1352                                                     TDB_DATA data,
1353                                                     void *private_data),
1354                                      void *private_data)
1355 {
1356         NTSTATUS status;
1357         TDB_DATA data;
1358
1359         status = db_ctdb_fetch(db, talloc_tos(), key, &data);
1360         if (!NT_STATUS_IS_OK(status)) {
1361                 return status;
1362         }
1363         parser(key, data, private_data);
1364         TALLOC_FREE(data.dptr);
1365         return NT_STATUS_OK;
1366 }
1367
1368 struct traverse_state {
1369         struct db_context *db;
1370         int (*fn)(struct db_record *rec, void *private_data);
1371         void *private_data;
1372         int count;
1373 };
1374
1375 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1376 {
1377         struct traverse_state *state = (struct traverse_state *)private_data;
1378         struct db_record *rec;
1379         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1380         /* we have to give them a locked record to prevent races */
1381         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1382         if (rec && rec->value.dsize > 0) {
1383                 state->fn(rec, state->private_data);
1384         }
1385         talloc_free(tmp_ctx);
1386 }
1387
1388 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1389                                         void *private_data)
1390 {
1391         struct traverse_state *state = (struct traverse_state *)private_data;
1392         struct db_record *rec;
1393         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1394         int ret = 0;
1395
1396         /*
1397          * Skip the __db_sequence_number__ key:
1398          * This is used for persistent transactions internally.
1399          */
1400         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1401             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1402         {
1403                 goto done;
1404         }
1405
1406         /* we have to give them a locked record to prevent races */
1407         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1408         if (rec && rec->value.dsize > 0) {
1409                 ret = state->fn(rec, state->private_data);
1410         }
1411
1412 done:
1413         talloc_free(tmp_ctx);
1414         return ret;
1415 }
1416
1417 /* wrapper to use traverse_persistent_callback with dbwrap */
1418 static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
1419 {
1420         return traverse_persistent_callback(NULL, rec->key, rec->value, data);
1421 }
1422
1423
1424 static int db_ctdb_traverse(struct db_context *db,
1425                             int (*fn)(struct db_record *rec,
1426                                       void *private_data),
1427                             void *private_data)
1428 {
1429         NTSTATUS status;
1430         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1431                                                         struct db_ctdb_ctx);
1432         struct traverse_state state;
1433
1434         state.db = db;
1435         state.fn = fn;
1436         state.private_data = private_data;
1437         state.count = 0;
1438
1439         if (db->persistent) {
1440                 struct tdb_context *ltdb = ctx->wtdb->tdb;
1441                 int ret;
1442
1443                 /* for persistent databases we don't need to do a ctdb traverse,
1444                    we can do a faster local traverse */
1445                 ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
1446                 if (ret < 0) {
1447                         return ret;
1448                 }
1449                 if (ctx->transaction && ctx->transaction->m_write) {
1450                         /*
1451                          * we now have to handle keys not yet
1452                          * present at transaction start
1453                          */
1454                         struct db_context *newkeys = db_open_rbt(talloc_tos());
1455                         struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
1456                         struct ctdb_rec_data *rec=NULL;
1457                         int i;
1458                         int count = 0;
1459
1460                         if (newkeys == NULL) {
1461                                 return -1;
1462                         }
1463
1464                         for (i=0; i<mbuf->count; i++) {
1465                                 TDB_DATA key;
1466                                 rec = db_ctdb_marshall_loop_next_key(
1467                                         mbuf, rec, &key);
1468                                 SMB_ASSERT(rec != NULL);
1469
1470                                 if (!tdb_exists(ltdb, key)) {
1471                                         dbwrap_store(newkeys, key, tdb_null, 0);
1472                                 }
1473                         }
1474                         status = dbwrap_traverse(newkeys,
1475                                                  traverse_persistent_callback_dbwrap,
1476                                                  &state,
1477                                                  &count);
1478                         talloc_free(newkeys);
1479                         if (!NT_STATUS_IS_OK(status)) {
1480                                 return -1;
1481                         }
1482                         ret += count;
1483                 }
1484                 return ret;
1485         }
1486
1487         status = ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1488         if (!NT_STATUS_IS_OK(status)) {
1489                 return -1;
1490         }
1491         return state.count;
1492 }
1493
1494 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1495 {
1496         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1497 }
1498
1499 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1500 {
1501         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1502 }
1503
1504 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1505 {
1506         struct traverse_state *state = (struct traverse_state *)private_data;
1507         struct db_record rec;
1508         rec.key = key;
1509         rec.value = data;
1510         rec.store = db_ctdb_store_deny;
1511         rec.delete_rec = db_ctdb_delete_deny;
1512         rec.private_data = state->db;
1513         state->fn(&rec, state->private_data);
1514         state->count++;
1515 }
1516
1517 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1518                                         void *private_data)
1519 {
1520         struct traverse_state *state = (struct traverse_state *)private_data;
1521         struct db_record rec;
1522
1523         /*
1524          * Skip the __db_sequence_number__ key:
1525          * This is used for persistent transactions internally.
1526          */
1527         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1528             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1529         {
1530                 return 0;
1531         }
1532
1533         rec.key = kbuf;
1534         rec.value = dbuf;
1535         rec.store = db_ctdb_store_deny;
1536         rec.delete_rec = db_ctdb_delete_deny;
1537         rec.private_data = state->db;
1538
1539         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1540                 /* a deleted record */
1541                 return 0;
1542         }
1543         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1544         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1545
1546         state->count++;
1547         return state->fn(&rec, state->private_data);
1548 }
1549
1550 static int db_ctdb_traverse_read(struct db_context *db,
1551                                  int (*fn)(struct db_record *rec,
1552                                            void *private_data),
1553                                  void *private_data)
1554 {
1555         NTSTATUS status;
1556         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1557                                                         struct db_ctdb_ctx);
1558         struct traverse_state state;
1559
1560         state.db = db;
1561         state.fn = fn;
1562         state.private_data = private_data;
1563         state.count = 0;
1564
1565         if (db->persistent) {
1566                 /* for persistent databases we don't need to do a ctdb traverse,
1567                    we can do a faster local traverse */
1568                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1569         }
1570
1571         status = ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1572         if (!NT_STATUS_IS_OK(status)) {
1573                 return -1;
1574         }
1575         return state.count;
1576 }
1577
1578 static int db_ctdb_get_seqnum(struct db_context *db)
1579 {
1580         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1581                                                         struct db_ctdb_ctx);
1582         return tdb_get_seqnum(ctx->wtdb->tdb);
1583 }
1584
1585 static void db_ctdb_id(struct db_context *db, const uint8_t **id,
1586                        size_t *idlen)
1587 {
1588         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1589                 db->private_data, struct db_ctdb_ctx);
1590
1591         *id = (uint8_t *)&ctx->db_id;
1592         *idlen = sizeof(ctx->db_id);
1593 }
1594
1595 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1596                                 const char *name,
1597                                 int hash_size, int tdb_flags,
1598                                 int open_flags, mode_t mode,
1599                                 enum dbwrap_lock_order lock_order)
1600 {
1601         struct db_context *result;
1602         struct db_ctdb_ctx *db_ctdb;
1603         char *db_path;
1604         struct ctdbd_connection *conn;
1605         struct loadparm_context *lp_ctx;
1606         struct ctdb_db_priority prio;
1607         NTSTATUS status;
1608         int cstatus;
1609
1610         if (!lp_clustering()) {
1611                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1612                 return NULL;
1613         }
1614
1615         if (!(result = talloc_zero(mem_ctx, struct db_context))) {
1616                 DEBUG(0, ("talloc failed\n"));
1617                 TALLOC_FREE(result);
1618                 return NULL;
1619         }
1620
1621         if (!(db_ctdb = talloc(result, struct db_ctdb_ctx))) {
1622                 DEBUG(0, ("talloc failed\n"));
1623                 TALLOC_FREE(result);
1624                 return NULL;
1625         }
1626
1627         db_ctdb->transaction = NULL;
1628         db_ctdb->db = result;
1629
1630         conn = messaging_ctdbd_connection();
1631         if (conn == NULL) {
1632                 DEBUG(1, ("Could not connect to ctdb\n"));
1633                 TALLOC_FREE(result);
1634                 return NULL;
1635         }
1636
1637         if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn, name, &db_ctdb->db_id, tdb_flags))) {
1638                 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1639                 TALLOC_FREE(result);
1640                 return NULL;
1641         }
1642
1643         db_path = ctdbd_dbpath(conn, db_ctdb, db_ctdb->db_id);
1644
1645         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1646         result->lock_order = lock_order;
1647
1648         /* only pass through specific flags */
1649         tdb_flags &= TDB_SEQNUM;
1650
1651         /* honor permissions if user has specified O_CREAT */
1652         if (open_flags & O_CREAT) {
1653                 chmod(db_path, mode);
1654         }
1655
1656         prio.db_id = db_ctdb->db_id;
1657         prio.priority = lock_order;
1658
1659         status = ctdbd_control_local(
1660                 conn, CTDB_CONTROL_SET_DB_PRIORITY, 0, 0,
1661                 make_tdb_data((uint8_t *)&prio, sizeof(prio)),
1662                 NULL, NULL, &cstatus);
1663
1664         if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1665                 DEBUG(1, ("CTDB_CONTROL_SET_DB_PRIORITY failed: %s, %d\n",
1666                           nt_errstr(status), cstatus));
1667                 TALLOC_FREE(result);
1668                 return NULL;
1669         }
1670
1671         lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
1672
1673         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags,
1674                                       O_RDWR, 0, lp_ctx);
1675         talloc_unlink(db_path, lp_ctx);
1676         if (db_ctdb->wtdb == NULL) {
1677                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1678                 TALLOC_FREE(result);
1679                 return NULL;
1680         }
1681         talloc_free(db_path);
1682
1683         if (result->persistent) {
1684                 db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb,
1685                                                     ctdb_conn_msg_ctx(conn));
1686                 if (db_ctdb->lock_ctx == NULL) {
1687                         DEBUG(0, ("g_lock_ctx_init failed\n"));
1688                         TALLOC_FREE(result);
1689                         return NULL;
1690                 }
1691         }
1692
1693         result->private_data = (void *)db_ctdb;
1694         result->fetch_locked = db_ctdb_fetch_locked;
1695         result->try_fetch_locked = db_ctdb_try_fetch_locked;
1696         result->parse_record = db_ctdb_parse_record;
1697         result->traverse = db_ctdb_traverse;
1698         result->traverse_read = db_ctdb_traverse_read;
1699         result->get_seqnum = db_ctdb_get_seqnum;
1700         result->transaction_start = db_ctdb_transaction_start;
1701         result->transaction_commit = db_ctdb_transaction_commit;
1702         result->transaction_cancel = db_ctdb_transaction_cancel;
1703         result->id = db_ctdb_id;
1704         result->stored_callback = NULL;
1705
1706         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1707                  name, db_ctdb->db_id));
1708
1709         return result;
1710 }
1711
1712 #else /* CLUSTER_SUPPORT */
1713
1714 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1715                                 const char *name,
1716                                 int hash_size, int tdb_flags,
1717                                 int open_flags, mode_t mode,
1718                                 enum dbwrap_lock_order lock_order)
1719 {
1720         DEBUG(3, ("db_open_ctdb: no cluster support!\n"));
1721         errno = ENOSYS;
1722         return NULL;
1723 }
1724
1725 #endif