ctdb_conn: Use messaging_ctdb_connection
[samba.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
1 /*
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007-2009
5    Copyright (C) Michael Adam 2009
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
24 #include "util_tdb.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
29
30 #include "ctdb/include/ctdb_protocol.h"
31 #include "ctdbd_conn.h"
32 #include "dbwrap/dbwrap.h"
33 #include "dbwrap/dbwrap_private.h"
34 #include "dbwrap/dbwrap_ctdb.h"
35 #include "g_lock.h"
36 #include "messages.h"
37 #include "messages_ctdb.h"
38 #include "lib/cluster_support.h"
39 #include "lib/util/tevent_ntstatus.h"
40
41 struct db_ctdb_transaction_handle {
42         struct db_ctdb_ctx *ctx;
43         /*
44          * we store the writes done under a transaction:
45          */
46         struct ctdb_marshall_buffer *m_write;
47         uint32_t nesting;
48         bool nested_cancel;
49         char *lock_name;
50 };
51
52 struct db_ctdb_ctx {
53         struct db_context *db;
54         struct tdb_wrap *wtdb;
55         uint32_t db_id;
56         struct db_ctdb_transaction_handle *transaction;
57         struct g_lock_ctx *lock_ctx;
58
59         /* thresholds for warning messages */
60         int warn_unlock_msecs;
61         int warn_migrate_msecs;
62         int warn_migrate_attempts;
63         int warn_locktime_msecs;
64 };
65
66 struct db_ctdb_rec {
67         struct db_ctdb_ctx *ctdb_ctx;
68         struct ctdb_ltdb_header header;
69         struct timeval lock_time;
70 };
71
72 struct ctdb_async_ctx {
73         bool initialized;
74         struct ctdbd_connection *async_conn;
75 };
76
77 static struct ctdb_async_ctx ctdb_async_ctx;
78
79 static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
80                                         struct tevent_context *ev,
81                                         bool reinit)
82 {
83         int ret;
84
85         if (reinit) {
86                 TALLOC_FREE(ctdb_async_ctx.async_conn);
87                 ctdb_async_ctx.initialized = false;
88         }
89
90         if (ctdb_async_ctx.initialized) {
91                 return 0;
92         }
93
94         become_root();
95         ret = ctdbd_init_connection(mem_ctx,
96                                     lp_ctdbd_socket(),
97                                     lp_ctdb_timeout(),
98                                     &ctdb_async_ctx.async_conn);
99         unbecome_root();
100
101         if (ctdb_async_ctx.async_conn == NULL) {
102                 DBG_ERR("ctdbd_init_connection failed\n");
103                 return EIO;
104         }
105
106         ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
107         if (ret != 0) {
108                 DBG_ERR("ctdbd_setup_fde failed\n");
109                 TALLOC_FREE(ctdb_async_ctx.async_conn);
110                 return ret;
111         }
112
113         ctdb_async_ctx.initialized = true;
114         return 0;
115 }
116
117 static int ctdb_async_ctx_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
118 {
119         return ctdb_async_ctx_init_internal(mem_ctx, ev, false);
120 }
121
122 int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
123 {
124         return ctdb_async_ctx_init_internal(mem_ctx, ev, true);
125 }
126
127 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
128 {
129         enum TDB_ERROR tret = tdb_error(tdb);
130
131         return map_nt_error_from_tdb(tret);
132 }
133
134 struct db_ctdb_ltdb_parse_state {
135         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
136                        TDB_DATA data, void *private_data);
137         void *private_data;
138 };
139
140 static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
141                                void *private_data)
142 {
143         struct db_ctdb_ltdb_parse_state *state =
144                 (struct db_ctdb_ltdb_parse_state *)private_data;
145
146         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
147                 return -1;
148         }
149
150         state->parser(
151                 key, (struct ctdb_ltdb_header *)data.dptr,
152                 make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
153                               data.dsize - sizeof(struct ctdb_ltdb_header)),
154                 state->private_data);
155         return 0;
156 }
157
158 static NTSTATUS db_ctdb_ltdb_parse(
159         struct db_ctdb_ctx *db, TDB_DATA key,
160         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
161                        TDB_DATA data, void *private_data),
162         void *private_data)
163 {
164         struct db_ctdb_ltdb_parse_state state;
165         int ret;
166
167         state.parser = parser;
168         state.private_data = private_data;
169
170         ret = tdb_parse_record(db->wtdb->tdb, key, db_ctdb_ltdb_parser,
171                                &state);
172         if (ret == -1) {
173                 return NT_STATUS_NOT_FOUND;
174         }
175         return NT_STATUS_OK;
176 }
177
178 /*
179  * Store a record together with the ctdb record header
180  * in the local copy of the database.
181  */
182 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
183                                    TDB_DATA key,
184                                    struct ctdb_ltdb_header *header,
185                                    const TDB_DATA *dbufs, int num_dbufs)
186 {
187         TDB_DATA recs[num_dbufs+1];
188         int ret;
189
190         recs[0] = (TDB_DATA) { .dptr = (uint8_t *)header,
191                                .dsize = sizeof(struct ctdb_ltdb_header) };
192         memcpy(&recs[1], dbufs, sizeof(TDB_DATA) * num_dbufs);
193
194         ret = tdb_storev(db->wtdb->tdb, key, recs, num_dbufs + 1, TDB_REPLACE);
195
196         return (ret == 0) ? NT_STATUS_OK
197                           : tdb_error_to_ntstatus(db->wtdb->tdb);
198
199 }
200
201 /*
202   form a ctdb_rec_data record from a key/data pair
203  */
204 static struct ctdb_rec_data_old *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
205                                                   TDB_DATA key,
206                                                   struct ctdb_ltdb_header *header,
207                                                   TDB_DATA data)
208 {
209         size_t length;
210         struct ctdb_rec_data_old *d;
211
212         length = offsetof(struct ctdb_rec_data_old, data) + key.dsize +
213                 data.dsize + sizeof(*header);
214         d = (struct ctdb_rec_data_old *)talloc_size(mem_ctx, length);
215         if (d == NULL) {
216                 return NULL;
217         }
218         d->length = length;
219         d->reqid = reqid;
220         d->keylen = key.dsize;
221         memcpy(&d->data[0], key.dptr, key.dsize);
222
223         d->datalen = data.dsize + sizeof(*header);
224         memcpy(&d->data[key.dsize], header, sizeof(*header));
225         memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
226         return d;
227 }
228
229
230 /* helper function for marshalling multiple records */
231 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
232                                                struct ctdb_marshall_buffer *m,
233                                                uint32_t db_id,
234                                                uint32_t reqid,
235                                                TDB_DATA key,
236                                                struct ctdb_ltdb_header *header,
237                                                TDB_DATA data)
238 {
239         struct ctdb_rec_data_old *r;
240         size_t m_size, r_size;
241         struct ctdb_marshall_buffer *m2 = NULL;
242
243         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
244         if (r == NULL) {
245                 talloc_free(m);
246                 return NULL;
247         }
248
249         if (m == NULL) {
250                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
251                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
252                 if (m == NULL) {
253                         goto done;
254                 }
255                 m->db_id = db_id;
256         }
257
258         m_size = talloc_get_size(m);
259         r_size = talloc_get_size(r);
260
261         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
262                 mem_ctx, m,  m_size + r_size);
263         if (m2 == NULL) {
264                 talloc_free(m);
265                 goto done;
266         }
267
268         memcpy(m_size + (uint8_t *)m2, r, r_size);
269
270         m2->count++;
271
272 done:
273         talloc_free(r);
274         return m2;
275 }
276
277 /* we've finished marshalling, return a data blob with the marshalled records */
278 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
279 {
280         TDB_DATA data;
281         data.dptr = (uint8_t *)m;
282         data.dsize = talloc_get_size(m);
283         return data;
284 }
285
286 /*
287    loop over a marshalling buffer
288
289      - pass r==NULL to start
290      - loop the number of times indicated by m->count
291 */
292 static struct ctdb_rec_data_old *db_ctdb_marshall_loop_next_key(
293         struct ctdb_marshall_buffer *m, struct ctdb_rec_data_old *r, TDB_DATA *key)
294 {
295         if (r == NULL) {
296                 r = (struct ctdb_rec_data_old *)&m->data[0];
297         } else {
298                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
299         }
300
301         key->dptr   = &r->data[0];
302         key->dsize  = r->keylen;
303         return r;
304 }
305
306 static bool db_ctdb_marshall_buf_parse(
307         struct ctdb_rec_data_old *r, uint32_t *reqid,
308         struct ctdb_ltdb_header **header, TDB_DATA *data)
309 {
310         if (r->datalen < sizeof(struct ctdb_ltdb_header)) {
311                 return false;
312         }
313
314         *reqid = r->reqid;
315
316         data->dptr  = &r->data[r->keylen] + sizeof(struct ctdb_ltdb_header);
317         data->dsize = r->datalen - sizeof(struct ctdb_ltdb_header);
318
319         *header = (struct ctdb_ltdb_header *)&r->data[r->keylen];
320
321         return true;
322 }
323
324 /**
325  * CTDB transaction destructor
326  */
327 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
328 {
329         NTSTATUS status;
330
331         status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
332         if (!NT_STATUS_IS_OK(status)) {
333                 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
334                           nt_errstr(status)));
335                 return -1;
336         }
337         return 0;
338 }
339
340 /**
341  * CTDB dbwrap API: transaction_start function
342  * starts a transaction on a persistent database
343  */
344 static int db_ctdb_transaction_start(struct db_context *db)
345 {
346         struct db_ctdb_transaction_handle *h;
347         NTSTATUS status;
348         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
349                                                         struct db_ctdb_ctx);
350
351         if (!db->persistent) {
352                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
353                          ctx->db_id));
354                 return -1;
355         }
356
357         if (ctx->transaction) {
358                 ctx->transaction->nesting++;
359                 DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
360                           ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
361                 return 0;
362         }
363
364         h = talloc_zero(db, struct db_ctdb_transaction_handle);
365         if (h == NULL) {
366                 DEBUG(0,(__location__ " oom for transaction handle\n"));
367                 return -1;
368         }
369
370         h->ctx = ctx;
371
372         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
373                                        (unsigned int)ctx->db_id);
374         if (h->lock_name == NULL) {
375                 DEBUG(0, ("talloc_asprintf failed\n"));
376                 TALLOC_FREE(h);
377                 return -1;
378         }
379
380         /*
381          * Wait a day, i.e. forever...
382          */
383         status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE,
384                              timeval_set(86400, 0));
385         if (!NT_STATUS_IS_OK(status)) {
386                 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status)));
387                 TALLOC_FREE(h);
388                 return -1;
389         }
390
391         talloc_set_destructor(h, db_ctdb_transaction_destructor);
392
393         ctx->transaction = h;
394
395         DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
396
397         return 0;
398 }
399
400 static bool parse_newest_in_marshall_buffer(
401         struct ctdb_marshall_buffer *buf, TDB_DATA key,
402         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
403                        TDB_DATA data, void *private_data),
404         void *private_data)
405 {
406         struct ctdb_rec_data_old *rec = NULL;
407         struct ctdb_ltdb_header *h = NULL;
408         TDB_DATA data;
409         uint32_t i;
410
411         if (buf == NULL) {
412                 return false;
413         }
414
415         /*
416          * Walk the list of records written during this
417          * transaction. If we want to read one we have already
418          * written, return the last written sample. Thus we do not do
419          * a "break;" for the first hit, this record might have been
420          * overwritten later.
421          */
422
423         for (i=0; i<buf->count; i++) {
424                 TDB_DATA tkey;
425                 uint32_t reqid;
426
427                 rec = db_ctdb_marshall_loop_next_key(buf, rec, &tkey);
428                 if (rec == NULL) {
429                         return false;
430                 }
431
432                 if (!tdb_data_equal(key, tkey)) {
433                         continue;
434                 }
435
436                 if (!db_ctdb_marshall_buf_parse(rec, &reqid, &h, &data)) {
437                         return false;
438                 }
439         }
440
441         if (h == NULL) {
442                 return false;
443         }
444
445         parser(key, h, data, private_data);
446
447         return true;
448 }
449
450 struct pull_newest_from_marshall_buffer_state {
451         struct ctdb_ltdb_header *pheader;
452         TALLOC_CTX *mem_ctx;
453         TDB_DATA *pdata;
454 };
455
456 static void pull_newest_from_marshall_buffer_parser(
457         TDB_DATA key, struct ctdb_ltdb_header *header,
458         TDB_DATA data, void *private_data)
459 {
460         struct pull_newest_from_marshall_buffer_state *state =
461                 (struct pull_newest_from_marshall_buffer_state *)private_data;
462
463         if (state->pheader != NULL) {
464                 memcpy(state->pheader, header, sizeof(*state->pheader));
465         }
466         if (state->pdata != NULL) {
467                 state->pdata->dsize = data.dsize;
468                 state->pdata->dptr = (uint8_t *)talloc_memdup(
469                         state->mem_ctx, data.dptr, data.dsize);
470         }
471 }
472
473 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
474                                              TDB_DATA key,
475                                              struct ctdb_ltdb_header *pheader,
476                                              TALLOC_CTX *mem_ctx,
477                                              TDB_DATA *pdata)
478 {
479         struct pull_newest_from_marshall_buffer_state state;
480
481         state.pheader = pheader;
482         state.mem_ctx = mem_ctx;
483         state.pdata = pdata;
484
485         if (!parse_newest_in_marshall_buffer(
486                     buf, key, pull_newest_from_marshall_buffer_parser,
487                     &state)) {
488                 return false;
489         }
490         if ((pdata != NULL) && (pdata->dsize != 0) && (pdata->dptr == NULL)) {
491                 /* ENOMEM */
492                 return false;
493         }
494         return true;
495 }
496
497 static NTSTATUS db_ctdb_storev_transaction(struct db_record *rec,
498                                            const TDB_DATA *dbufs, int num_dbufs,
499                                            int flag);
500 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
501
502 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
503                                                           TALLOC_CTX *mem_ctx,
504                                                           TDB_DATA key)
505 {
506         struct db_record *result;
507         TDB_DATA ctdb_data;
508
509         if (!(result = talloc(mem_ctx, struct db_record))) {
510                 DEBUG(0, ("talloc failed\n"));
511                 return NULL;
512         }
513
514         result->db = ctx->db;
515         result->private_data = ctx->transaction;
516
517         result->key.dsize = key.dsize;
518         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
519                                                     key.dsize);
520         if (result->key.dptr == NULL) {
521                 DEBUG(0, ("talloc failed\n"));
522                 TALLOC_FREE(result);
523                 return NULL;
524         }
525
526         result->storev = db_ctdb_storev_transaction;
527         result->delete_rec = db_ctdb_delete_transaction;
528
529         if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key,
530                                              NULL, result, &result->value)) {
531                 return result;
532         }
533
534         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
535         if (ctdb_data.dptr == NULL) {
536                 /* create the record */
537                 result->value = tdb_null;
538                 return result;
539         }
540
541         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
542         result->value.dptr = NULL;
543
544         if ((result->value.dsize != 0)
545             && !(result->value.dptr = (uint8_t *)talloc_memdup(
546                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
547                          result->value.dsize))) {
548                 DEBUG(0, ("talloc failed\n"));
549                 TALLOC_FREE(result);
550         }
551
552         SAFE_FREE(ctdb_data.dptr);
553
554         return result;
555 }
556
557 static int db_ctdb_record_destructor(struct db_record **recp)
558 {
559         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
560         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
561                 rec->private_data, struct db_ctdb_transaction_handle);
562         int ret = h->ctx->db->transaction_commit(h->ctx->db);
563         if (ret != 0) {
564                 DEBUG(0,(__location__ " transaction_commit failed\n"));
565         }
566         return 0;
567 }
568
569 /*
570   auto-create a transaction for persistent databases
571  */
572 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
573                                                          TALLOC_CTX *mem_ctx,
574                                                          TDB_DATA key)
575 {
576         int res;
577         struct db_record *rec, **recp;
578
579         res = db_ctdb_transaction_start(ctx->db);
580         if (res == -1) {
581                 return NULL;
582         }
583
584         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
585         if (rec == NULL) {
586                 ctx->db->transaction_cancel(ctx->db);
587                 return NULL;
588         }
589
590         /* destroy this transaction when we release the lock */
591         recp = talloc(rec, struct db_record *);
592         if (recp == NULL) {
593                 ctx->db->transaction_cancel(ctx->db);
594                 talloc_free(rec);
595                 return NULL;
596         }
597         *recp = rec;
598         talloc_set_destructor(recp, db_ctdb_record_destructor);
599         return rec;
600 }
601
602
603 /*
604   stores a record inside a transaction
605  */
606 static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
607                                           TDB_DATA key, TDB_DATA data)
608 {
609         TALLOC_CTX *tmp_ctx = talloc_new(h);
610         TDB_DATA rec;
611         struct ctdb_ltdb_header header;
612
613         ZERO_STRUCT(header);
614
615         /* we need the header so we can update the RSN */
616
617         if (!pull_newest_from_marshall_buffer(h->m_write, key, &header,
618                                               NULL, NULL)) {
619
620                 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
621
622                 if (rec.dptr != NULL) {
623                         memcpy(&header, rec.dptr,
624                                sizeof(struct ctdb_ltdb_header));
625                         rec.dsize -= sizeof(struct ctdb_ltdb_header);
626
627                         /*
628                          * a special case, we are writing the same
629                          * data that is there now
630                          */
631                         if (data.dsize == rec.dsize &&
632                             memcmp(data.dptr,
633                                    rec.dptr + sizeof(struct ctdb_ltdb_header),
634                                    data.dsize) == 0) {
635                                 SAFE_FREE(rec.dptr);
636                                 talloc_free(tmp_ctx);
637                                 return NT_STATUS_OK;
638                         }
639                 }
640                 SAFE_FREE(rec.dptr);
641         }
642
643         header.dmaster = get_my_vnn();
644         header.rsn++;
645
646         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
647         if (h->m_write == NULL) {
648                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
649                 talloc_free(tmp_ctx);
650                 return NT_STATUS_NO_MEMORY;
651         }
652
653         talloc_free(tmp_ctx);
654         return NT_STATUS_OK;
655 }
656
657
658 /* 
659    a record store inside a transaction
660  */
661 static NTSTATUS db_ctdb_storev_transaction(
662         struct db_record *rec, const TDB_DATA *dbufs, int num_dbufs, int flag)
663 {
664         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
665                 rec->private_data, struct db_ctdb_transaction_handle);
666         NTSTATUS status;
667         TDB_DATA data;
668
669         data = dbwrap_merge_dbufs(rec, dbufs, num_dbufs);
670         if (data.dptr == NULL) {
671                 return NT_STATUS_NO_MEMORY;
672         }
673
674         status = db_ctdb_transaction_store(h, rec->key, data);
675
676         TALLOC_FREE(data.dptr);
677
678         return status;
679 }
680
681 /*
682    a record delete inside a transaction
683  */
684 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
685 {
686         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
687                 rec->private_data, struct db_ctdb_transaction_handle);
688         NTSTATUS status;
689
690         status =  db_ctdb_transaction_store(h, rec->key, tdb_null);
691         return status;
692 }
693
694 static void db_ctdb_fetch_db_seqnum_parser(
695         TDB_DATA key, struct ctdb_ltdb_header *header,
696         TDB_DATA data, void *private_data)
697 {
698         uint64_t *seqnum = (uint64_t *)private_data;
699
700         if (data.dsize != sizeof(uint64_t)) {
701                 *seqnum = 0;
702                 return;
703         }
704         memcpy(seqnum, data.dptr, sizeof(*seqnum));
705 }
706
707 /**
708  * Fetch the db sequence number of a persistent db directly from the db.
709  */
710 static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
711                                                 uint64_t *seqnum)
712 {
713         NTSTATUS status;
714         TDB_DATA key;
715
716         if (seqnum == NULL) {
717                 return NT_STATUS_INVALID_PARAMETER;
718         }
719
720         key = string_term_tdb_data(CTDB_DB_SEQNUM_KEY);
721
722         status = db_ctdb_ltdb_parse(
723                 db, key, db_ctdb_fetch_db_seqnum_parser, seqnum);
724
725         if (NT_STATUS_IS_OK(status)) {
726                 return NT_STATUS_OK;
727         }
728         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
729                 *seqnum = 0;
730                 return NT_STATUS_OK;
731         }
732         return status;
733 }
734
735 /**
736  * Store the database sequence number inside a transaction.
737  */
738 static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h,
739                                         uint64_t seqnum)
740 {
741         NTSTATUS status;
742         const char *keyname = CTDB_DB_SEQNUM_KEY;
743         TDB_DATA key;
744         TDB_DATA data;
745
746         key = string_term_tdb_data(keyname);
747
748         data.dptr = (uint8_t *)&seqnum;
749         data.dsize = sizeof(uint64_t);
750
751         status = db_ctdb_transaction_store(h, key, data);
752
753         return status;
754 }
755
756 /*
757   commit a transaction
758  */
759 static int db_ctdb_transaction_commit(struct db_context *db)
760 {
761         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
762                                                         struct db_ctdb_ctx);
763         NTSTATUS rets;
764         int32_t status;
765         struct db_ctdb_transaction_handle *h = ctx->transaction;
766         uint64_t old_seqnum, new_seqnum;
767         int ret;
768
769         if (h == NULL) {
770                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
771                 return -1;
772         }
773
774         if (h->nested_cancel) {
775                 db->transaction_cancel(db);
776                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
777                 return -1;
778         }
779
780         if (h->nesting != 0) {
781                 h->nesting--;
782                 DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
783                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
784                 return 0;
785         }
786
787         if (h->m_write == NULL) {
788                 /*
789                  * No changes were made, so don't change the seqnum,
790                  * don't push to other node, just exit with success.
791                  */
792                 ret = 0;
793                 goto done;
794         }
795
796         DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
797
798         /*
799          * As the last db action before committing, bump the database sequence
800          * number. Note that this undoes all changes to the seqnum records
801          * performed under the transaction. This record is not meant to be
802          * modified by user interaction. It is for internal use only...
803          */
804         rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum);
805         if (!NT_STATUS_IS_OK(rets)) {
806                 DEBUG(1, (__location__ " failed to fetch the db sequence number "
807                           "in transaction commit on db 0x%08x\n", ctx->db_id));
808                 ret = -1;
809                 goto done;
810         }
811
812         new_seqnum = old_seqnum + 1;
813
814         rets = db_ctdb_store_db_seqnum(h, new_seqnum);
815         if (!NT_STATUS_IS_OK(rets)) {
816                 DEBUG(1, (__location__ "failed to store the db sequence number "
817                           " in transaction commit on db 0x%08x\n", ctx->db_id));
818                 ret = -1;
819                 goto done;
820         }
821
822 again:
823         /* tell ctdbd to commit to the other nodes */
824         ret = ctdbd_control_local(messaging_ctdb_connection(),
825                                   CTDB_CONTROL_TRANS3_COMMIT,
826                                   h->ctx->db_id, 0,
827                                   db_ctdb_marshall_finish(h->m_write),
828                                   NULL, NULL, &status);
829         if ((ret != 0) || status != 0) {
830                 /*
831                  * The TRANS3_COMMIT control should only possibly fail when a
832                  * recovery has been running concurrently. In any case, the db
833                  * will be the same on all nodes, either the new copy or the
834                  * old copy.  This can be detected by comparing the old and new
835                  * local sequence numbers.
836                  */
837                 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum);
838                 if (!NT_STATUS_IS_OK(rets)) {
839                         DEBUG(1, (__location__ " failed to refetch db sequence "
840                                   "number after failed TRANS3_COMMIT\n"));
841                         ret = -1;
842                         goto done;
843                 }
844
845                 if (new_seqnum == old_seqnum) {
846                         /* Recovery prevented all our changes: retry. */
847                         goto again;
848                 }
849                 if (new_seqnum != (old_seqnum + 1)) {
850                         DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
851                                   "old_seqnum[%lu] + (0 or 1) after failed "
852                                   "TRANS3_COMMIT - this should not happen!\n",
853                                   (unsigned long)new_seqnum,
854                                   (unsigned long)old_seqnum));
855                         ret = -1;
856                         goto done;
857                 }
858                 /*
859                  * Recovery propagated our changes to all nodes, completing
860                  * our commit for us - succeed.
861                  */
862         }
863
864         ret = 0;
865
866 done:
867         h->ctx->transaction = NULL;
868         talloc_free(h);
869         return ret;
870 }
871
872
873 /*
874   cancel a transaction
875  */
876 static int db_ctdb_transaction_cancel(struct db_context *db)
877 {
878         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
879                                                         struct db_ctdb_ctx);
880         struct db_ctdb_transaction_handle *h = ctx->transaction;
881
882         if (h == NULL) {
883                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
884                 return -1;
885         }
886
887         if (h->nesting != 0) {
888                 h->nesting--;
889                 h->nested_cancel = true;
890                 DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
891                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
892                 return 0;
893         }
894
895         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
896
897         ctx->transaction = NULL;
898         talloc_free(h);
899         return 0;
900 }
901
902
903 static NTSTATUS db_ctdb_storev(struct db_record *rec,
904                                const TDB_DATA *dbufs, int num_dbufs, int flag)
905 {
906         struct db_ctdb_rec *crec = talloc_get_type_abort(
907                 rec->private_data, struct db_ctdb_rec);
908         NTSTATUS status;
909
910         status = db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header),
911                                     dbufs, num_dbufs);
912         return status;
913 }
914
915
916
917 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
918 {
919         NTSTATUS status = NT_STATUS_OK;
920         int ret;
921         struct ctdb_control_schedule_for_deletion *dd;
922         TDB_DATA indata;
923         int32_t cstatus;
924         struct db_ctdb_rec *crec = talloc_get_type_abort(
925                 rec->private_data, struct db_ctdb_rec);
926         struct db_ctdb_ctx *ctx = crec->ctdb_ctx;
927
928         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + rec->key.dsize;
929         indata.dptr = talloc_zero_array(crec, uint8_t, indata.dsize);
930         if (indata.dptr == NULL) {
931                 DEBUG(0, (__location__ " talloc failed!\n"));
932                 return NT_STATUS_NO_MEMORY;
933         }
934
935         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
936         dd->db_id = ctx->db_id;
937         dd->hdr = crec->header;
938         dd->keylen = rec->key.dsize;
939         memcpy(dd->key, rec->key.dptr, rec->key.dsize);
940
941         ret = ctdbd_control_local(messaging_ctdb_connection(),
942                                   CTDB_CONTROL_SCHEDULE_FOR_DELETION,
943                                   crec->ctdb_ctx->db_id,
944                                   CTDB_CTRL_FLAG_NOREPLY, /* flags */
945                                   indata,
946                                   NULL, /* outdata */
947                                   NULL, /* errmsg */
948                                   &cstatus);
949         talloc_free(indata.dptr);
950
951         if ((ret != 0) || cstatus != 0) {
952                 DEBUG(1, (__location__ " Error sending local control "
953                           "SCHEDULE_FOR_DELETION: %s, cstatus = %"PRIi32"\n",
954                           strerror(ret), cstatus));
955                 if (ret != 0) {
956                         status = map_nt_error_from_unix(ret);
957                 } else {
958                         status = NT_STATUS_UNSUCCESSFUL;
959                 }
960         }
961
962         return status;
963 }
964
965 static NTSTATUS db_ctdb_delete(struct db_record *rec)
966 {
967         NTSTATUS status;
968
969         /*
970          * We have to store the header with empty data. TODO: Fix the
971          * tdb-level cleanup
972          */
973
974         status = db_ctdb_storev(rec, &tdb_null, 1, 0);
975         if (!NT_STATUS_IS_OK(status)) {
976                 return status;
977         }
978
979         status = db_ctdb_send_schedule_for_deletion(rec);
980         return status;
981 }
982
983 static int db_ctdb_record_destr(struct db_record* data)
984 {
985         struct db_ctdb_rec *crec = talloc_get_type_abort(
986                 data->private_data, struct db_ctdb_rec);
987         int threshold;
988         int ret;
989         struct timeval before;
990         double timediff;
991
992         DEBUG(10, (DEBUGLEVEL > 10
993                    ? "Unlocking db %u key %s\n"
994                    : "Unlocking db %u key %.20s\n",
995                    (int)crec->ctdb_ctx->db_id,
996                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
997                               data->key.dsize)));
998
999         before = timeval_current();
1000
1001         ret = tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
1002
1003         timediff = timeval_elapsed(&before);
1004         timediff *= 1000;       /* get us milliseconds */
1005
1006         if (timediff > crec->ctdb_ctx->warn_unlock_msecs) {
1007                 char *key;
1008                 key = hex_encode_talloc(talloc_tos(),
1009                                         (unsigned char *)data->key.dptr,
1010                                         data->key.dsize);
1011                 DEBUG(0, ("tdb_chainunlock on db %s, key %s took %f milliseconds\n",
1012                           tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
1013                           timediff));
1014                 TALLOC_FREE(key);
1015         }
1016
1017         if (ret != 0) {
1018                 DEBUG(0, ("tdb_chainunlock failed\n"));
1019                 return -1;
1020         }
1021
1022         threshold = crec->ctdb_ctx->warn_locktime_msecs;
1023         if (threshold != 0) {
1024                 timediff = timeval_elapsed(&crec->lock_time) * 1000;
1025                 if (timediff > threshold) {
1026                         const char *key;
1027
1028                         key = hex_encode_talloc(data,
1029                                                 (unsigned char *)data->key.dptr,
1030                                                 data->key.dsize);
1031                         DEBUG(0, ("Held tdb lock on db %s, key %s "
1032                                   "%f milliseconds\n",
1033                                   tdb_name(crec->ctdb_ctx->wtdb->tdb),
1034                                   key, timediff));
1035                 }
1036         }
1037
1038         return 0;
1039 }
1040
1041 /**
1042  * Check whether we have a valid local copy of the given record,
1043  * either for reading or for writing.
1044  */
1045 static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header *hdr,
1046                                       uint32_t my_vnn, bool read_only)
1047 {
1048         if (hdr->dmaster != my_vnn) {
1049                 /* If we're not dmaster, it must be r/o copy. */
1050                 return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
1051         }
1052
1053         /*
1054          * If we want write access, no one may have r/o copies.
1055          */
1056         return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
1057 }
1058
1059 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, uint32_t my_vnn,
1060                                        bool read_only)
1061 {
1062         if (ctdb_data.dptr == NULL) {
1063                 return false;
1064         }
1065
1066         if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
1067                 return false;
1068         }
1069
1070         return db_ctdb_can_use_local_hdr(
1071                 (struct ctdb_ltdb_header *)ctdb_data.dptr, my_vnn, read_only);
1072 }
1073
1074 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
1075                                                TALLOC_CTX *mem_ctx,
1076                                                TDB_DATA key,
1077                                                bool tryonly)
1078 {
1079         struct db_record *result;
1080         struct db_ctdb_rec *crec;
1081         TDB_DATA ctdb_data;
1082         int migrate_attempts;
1083         struct timeval migrate_start;
1084         struct timeval chainlock_start;
1085         struct timeval ctdb_start_time;
1086         double chainlock_time = 0;
1087         double ctdb_time = 0;
1088         int duration_msecs;
1089         int lockret;
1090         int ret;
1091
1092         if (!(result = talloc(mem_ctx, struct db_record))) {
1093                 DEBUG(0, ("talloc failed\n"));
1094                 return NULL;
1095         }
1096
1097         if (!(crec = talloc_zero(result, struct db_ctdb_rec))) {
1098                 DEBUG(0, ("talloc failed\n"));
1099                 TALLOC_FREE(result);
1100                 return NULL;
1101         }
1102
1103         result->db = ctx->db;
1104         result->private_data = (void *)crec;
1105         crec->ctdb_ctx = ctx;
1106
1107         result->key.dsize = key.dsize;
1108         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
1109                                                     key.dsize);
1110         if (result->key.dptr == NULL) {
1111                 DEBUG(0, ("talloc failed\n"));
1112                 TALLOC_FREE(result);
1113                 return NULL;
1114         }
1115
1116         migrate_attempts = 0;
1117         GetTimeOfDay(&migrate_start);
1118
1119         /*
1120          * Do a blocking lock on the record
1121          */
1122 again:
1123
1124         if (DEBUGLEVEL >= 10) {
1125                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1126                 DEBUG(10, (DEBUGLEVEL > 10
1127                            ? "Locking db %u key %s\n"
1128                            : "Locking db %u key %.20s\n",
1129                            (int)crec->ctdb_ctx->db_id, keystr));
1130                 TALLOC_FREE(keystr);
1131         }
1132
1133         GetTimeOfDay(&chainlock_start);
1134         lockret = tryonly
1135                 ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
1136                 : tdb_chainlock(ctx->wtdb->tdb, key);
1137         chainlock_time += timeval_elapsed(&chainlock_start);
1138
1139         if (lockret != 0) {
1140                 DEBUG(3, ("tdb_chainlock failed\n"));
1141                 TALLOC_FREE(result);
1142                 return NULL;
1143         }
1144
1145         result->storev = db_ctdb_storev;
1146         result->delete_rec = db_ctdb_delete;
1147         talloc_set_destructor(result, db_ctdb_record_destr);
1148
1149         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1150
1151         /*
1152          * See if we have a valid record and we are the dmaster. If so, we can
1153          * take the shortcut and just return it.
1154          */
1155
1156         if (!db_ctdb_can_use_local_copy(ctdb_data, get_my_vnn(), false)) {
1157                 SAFE_FREE(ctdb_data.dptr);
1158                 tdb_chainunlock(ctx->wtdb->tdb, key);
1159                 talloc_set_destructor(result, NULL);
1160
1161                 if (tryonly && (migrate_attempts != 0)) {
1162                         DEBUG(5, ("record migrated away again\n"));
1163                         TALLOC_FREE(result);
1164                         return NULL;
1165                 }
1166
1167                 migrate_attempts += 1;
1168
1169                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %"PRIu32" "
1170                            "(%"PRIu32") %"PRIu32"\n",
1171                            ctdb_data.dptr, ctdb_data.dptr ?
1172                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster :
1173                            UINT32_MAX,
1174                            get_my_vnn(),
1175                            ctdb_data.dptr ?
1176                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
1177
1178                 GetTimeOfDay(&ctdb_start_time);
1179                 ret = ctdbd_migrate(messaging_ctdb_connection(), ctx->db_id,
1180                                     key);
1181                 ctdb_time += timeval_elapsed(&ctdb_start_time);
1182
1183                 if (ret != 0) {
1184                         DEBUG(5, ("ctdbd_migrate failed: %s\n",
1185                                   strerror(ret)));
1186                         TALLOC_FREE(result);
1187                         return NULL;
1188                 }
1189                 /* now its migrated, try again */
1190                 goto again;
1191         }
1192
1193         {
1194                 double duration;
1195                 duration = timeval_elapsed(&migrate_start);
1196
1197                 /*
1198                  * Convert the duration to milliseconds to avoid a
1199                  * floating-point division of
1200                  * lp_parm_int("migrate_duration") by 1000.
1201                  */
1202                 duration_msecs = duration * 1000;
1203         }
1204
1205         if ((migrate_attempts > ctx->warn_migrate_attempts) ||
1206             (duration_msecs > ctx->warn_migrate_msecs)) {
1207                 int chain = 0;
1208
1209                 if (tdb_get_flags(ctx->wtdb->tdb) & TDB_INCOMPATIBLE_HASH) {
1210                         chain = tdb_jenkins_hash(&key) %
1211                                 tdb_hash_size(ctx->wtdb->tdb);
1212                 }
1213
1214                 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
1215                           "needed %d attempts, %d milliseconds, "
1216                           "chainlock: %f ms, CTDB %f ms\n",
1217                           tdb_name(ctx->wtdb->tdb),
1218                           hex_encode_talloc(talloc_tos(),
1219                                             (unsigned char *)key.dptr,
1220                                             key.dsize),
1221                           chain,
1222                           migrate_attempts, duration_msecs,
1223                           chainlock_time * 1000.0,
1224                           ctdb_time * 1000.0));
1225         }
1226
1227         GetTimeOfDay(&crec->lock_time);
1228
1229         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1230
1231         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1232         result->value.dptr = NULL;
1233
1234         if ((result->value.dsize != 0)
1235             && !(result->value.dptr = (uint8_t *)talloc_memdup(
1236                          result, ctdb_data.dptr + sizeof(crec->header),
1237                          result->value.dsize))) {
1238                 DEBUG(0, ("talloc failed\n"));
1239                 TALLOC_FREE(result);
1240         }
1241
1242         SAFE_FREE(ctdb_data.dptr);
1243
1244         return result;
1245 }
1246
1247 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1248                                               TALLOC_CTX *mem_ctx,
1249                                               TDB_DATA key)
1250 {
1251         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1252                                                         struct db_ctdb_ctx);
1253
1254         if (ctx->transaction != NULL) {
1255                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1256         }
1257
1258         if (db->persistent) {
1259                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1260         }
1261
1262         return fetch_locked_internal(ctx, mem_ctx, key, false);
1263 }
1264
1265 static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
1266                                                   TALLOC_CTX *mem_ctx,
1267                                                   TDB_DATA key)
1268 {
1269         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1270                                                         struct db_ctdb_ctx);
1271
1272         if (ctx->transaction != NULL) {
1273                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1274         }
1275
1276         if (db->persistent) {
1277                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1278         }
1279
1280         return fetch_locked_internal(ctx, mem_ctx, key, true);
1281 }
1282
1283 struct db_ctdb_parse_record_state {
1284         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
1285         void *private_data;
1286         uint32_t my_vnn;
1287         bool ask_for_readonly_copy;
1288         bool done;
1289         bool empty_record;
1290 };
1291
1292 static void db_ctdb_parse_record_parser(
1293         TDB_DATA key, struct ctdb_ltdb_header *header,
1294         TDB_DATA data, void *private_data)
1295 {
1296         struct db_ctdb_parse_record_state *state =
1297                 (struct db_ctdb_parse_record_state *)private_data;
1298         state->parser(key, data, state->private_data);
1299 }
1300
1301 static void db_ctdb_parse_record_parser_nonpersistent(
1302         TDB_DATA key, struct ctdb_ltdb_header *header,
1303         TDB_DATA data, void *private_data)
1304 {
1305         struct db_ctdb_parse_record_state *state =
1306                 (struct db_ctdb_parse_record_state *)private_data;
1307
1308         if (db_ctdb_can_use_local_hdr(header, state->my_vnn, true)) {
1309                 /*
1310                  * A record consisting only of the ctdb header can be
1311                  * a validly created empty record or a tombstone
1312                  * record of a deleted record (not vacuumed yet). Mark
1313                  * it accordingly.
1314                  */
1315                 state->empty_record = (data.dsize == 0);
1316                 if (!state->empty_record) {
1317                         state->parser(key, data, state->private_data);
1318                 }
1319                 state->done = true;
1320         } else {
1321                 /*
1322                  * We found something in the db, so it seems that this record,
1323                  * while not usable locally right now, is popular. Ask for a
1324                  * R/O copy.
1325                  */
1326                 state->ask_for_readonly_copy = true;
1327         }
1328 }
1329
1330 static NTSTATUS db_ctdb_try_parse_local_record(struct db_ctdb_ctx *ctx,
1331                                                TDB_DATA key,
1332                                                struct db_ctdb_parse_record_state *state)
1333 {
1334         NTSTATUS status;
1335
1336         if (ctx->transaction != NULL) {
1337                 struct db_ctdb_transaction_handle *h = ctx->transaction;
1338                 bool found;
1339
1340                 /*
1341                  * Transactions only happen for persistent db's.
1342                  */
1343
1344                 found = parse_newest_in_marshall_buffer(
1345                         h->m_write, key, db_ctdb_parse_record_parser, state);
1346
1347                 if (found) {
1348                         return NT_STATUS_OK;
1349                 }
1350         }
1351
1352         if (ctx->db->persistent) {
1353                 /*
1354                  * Persistent db, but not found in the transaction buffer
1355                  */
1356                 return db_ctdb_ltdb_parse(
1357                         ctx, key, db_ctdb_parse_record_parser, state);
1358         }
1359
1360         state->done = false;
1361         state->ask_for_readonly_copy = false;
1362
1363         status = db_ctdb_ltdb_parse(
1364                 ctx, key, db_ctdb_parse_record_parser_nonpersistent, state);
1365         if (NT_STATUS_IS_OK(status) && state->done) {
1366                 if (state->empty_record) {
1367                         /*
1368                          * We know authoritatively, that this is an empty
1369                          * record. Since ctdb does not distinguish between empty
1370                          * and deleted records, this can be a record stored as
1371                          * empty or a not-yet-vacuumed tombstone record of a
1372                          * deleted record. Now Samba right now can live without
1373                          * empty records, so we can safely report this record
1374                          * as non-existing.
1375                          *
1376                          * See bugs 10008 and 12005.
1377                          */
1378                         return NT_STATUS_NOT_FOUND;
1379                 }
1380                 return NT_STATUS_OK;
1381         }
1382
1383         return NT_STATUS_MORE_PROCESSING_REQUIRED;
1384 }
1385
1386 static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
1387                                      void (*parser)(TDB_DATA key,
1388                                                     TDB_DATA data,
1389                                                     void *private_data),
1390                                      void *private_data)
1391 {
1392         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1393                 db->private_data, struct db_ctdb_ctx);
1394         struct db_ctdb_parse_record_state state;
1395         NTSTATUS status;
1396         int ret;
1397
1398         state.parser = parser;
1399         state.private_data = private_data;
1400         state.my_vnn = get_my_vnn();
1401         state.empty_record = false;
1402
1403         status = db_ctdb_try_parse_local_record(ctx, key, &state);
1404         if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
1405                 return status;
1406         }
1407
1408         ret = ctdbd_parse(messaging_ctdb_connection(), ctx->db_id, key,
1409                           state.ask_for_readonly_copy, parser, private_data);
1410         if (ret != 0) {
1411                 if (ret == ENOENT) {
1412                         /*
1413                          * This maps to
1414                          * NT_STATUS_OBJECT_NAME_NOT_FOUND. Our upper
1415                          * layers expect NT_STATUS_NOT_FOUND for "no
1416                          * record around". We need to convert dbwrap
1417                          * to 0/errno away from NTSTATUS ... :-)
1418                          */
1419                         return NT_STATUS_NOT_FOUND;
1420                 }
1421                 return map_nt_error_from_unix(ret);
1422         }
1423         return NT_STATUS_OK;
1424 }
1425
1426 static void db_ctdb_parse_record_done(struct tevent_req *subreq);
1427
1428 static struct tevent_req *db_ctdb_parse_record_send(
1429         TALLOC_CTX *mem_ctx,
1430         struct tevent_context *ev,
1431         struct db_context *db,
1432         TDB_DATA key,
1433         void (*parser)(TDB_DATA key,
1434                        TDB_DATA data,
1435                        void *private_data),
1436         void *private_data,
1437         enum dbwrap_req_state *req_state)
1438 {
1439         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1440                 db->private_data, struct db_ctdb_ctx);
1441         struct tevent_req *req = NULL;
1442         struct tevent_req *subreq = NULL;
1443         struct db_ctdb_parse_record_state *state = NULL;
1444         NTSTATUS status;
1445
1446         req = tevent_req_create(mem_ctx, &state,
1447                                 struct db_ctdb_parse_record_state);
1448         if (req == NULL) {
1449                 *req_state = DBWRAP_REQ_ERROR;
1450                 return NULL;
1451
1452         }
1453
1454         *state = (struct db_ctdb_parse_record_state) {
1455                 .parser = parser,
1456                 .private_data = private_data,
1457                 .my_vnn = get_my_vnn(),
1458                 .empty_record = false,
1459         };
1460
1461         status = db_ctdb_try_parse_local_record(ctx, key, state);
1462         if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
1463                 if (tevent_req_nterror(req, status)) {
1464                         *req_state = DBWRAP_REQ_ERROR;
1465                         return tevent_req_post(req, ev);
1466                 }
1467                 *req_state = DBWRAP_REQ_DONE;
1468                 tevent_req_done(req);
1469                 return tevent_req_post(req, ev);
1470         }
1471
1472         subreq = ctdbd_parse_send(state,
1473                                   ev,
1474                                   ctdb_async_ctx.async_conn,
1475                                   ctx->db_id,
1476                                   key,
1477                                   state->ask_for_readonly_copy,
1478                                   parser,
1479                                   private_data,
1480                                   req_state);
1481         if (tevent_req_nomem(subreq, req)) {
1482                 *req_state = DBWRAP_REQ_ERROR;
1483                 return tevent_req_post(req, ev);
1484         }
1485         tevent_req_set_callback(subreq, db_ctdb_parse_record_done, req);
1486
1487         return req;
1488 }
1489
1490 static void db_ctdb_parse_record_done(struct tevent_req *subreq)
1491 {
1492         struct tevent_req *req = tevent_req_callback_data(
1493                 subreq, struct tevent_req);
1494         int ret;
1495
1496         ret = ctdbd_parse_recv(subreq);
1497         TALLOC_FREE(subreq);
1498         if (ret != 0) {
1499                 if (ret == ENOENT) {
1500                         /*
1501                          * This maps to NT_STATUS_OBJECT_NAME_NOT_FOUND. Our
1502                          * upper layers expect NT_STATUS_NOT_FOUND for "no
1503                          * record around". We need to convert dbwrap to 0/errno
1504                          * away from NTSTATUS ... :-)
1505                          */
1506                         tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
1507                         return;
1508                 }
1509                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
1510                 return;
1511         }
1512
1513         tevent_req_done(req);
1514         return;
1515 }
1516
1517 static NTSTATUS db_ctdb_parse_record_recv(struct tevent_req *req)
1518 {
1519         return tevent_req_simple_recv_ntstatus(req);
1520 }
1521
1522 struct traverse_state {
1523         struct db_context *db;
1524         int (*fn)(struct db_record *rec, void *private_data);
1525         void *private_data;
1526         int count;
1527 };
1528
1529 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1530 {
1531         struct traverse_state *state = (struct traverse_state *)private_data;
1532         struct db_record *rec;
1533         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1534         /* we have to give them a locked record to prevent races */
1535         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1536         if (rec && rec->value.dsize > 0) {
1537                 state->fn(rec, state->private_data);
1538         }
1539         talloc_free(tmp_ctx);
1540 }
1541
1542 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1543                                         void *private_data)
1544 {
1545         struct traverse_state *state = (struct traverse_state *)private_data;
1546         struct db_record *rec;
1547         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1548         int ret = 0;
1549
1550         /*
1551          * Skip the __db_sequence_number__ key:
1552          * This is used for persistent transactions internally.
1553          */
1554         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1555             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1556         {
1557                 goto done;
1558         }
1559
1560         /* we have to give them a locked record to prevent races */
1561         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1562         if (rec && rec->value.dsize > 0) {
1563                 ret = state->fn(rec, state->private_data);
1564         }
1565
1566 done:
1567         talloc_free(tmp_ctx);
1568         return ret;
1569 }
1570
1571 /* wrapper to use traverse_persistent_callback with dbwrap */
1572 static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
1573 {
1574         return traverse_persistent_callback(NULL, rec->key, rec->value, data);
1575 }
1576
1577 static int db_ctdbd_traverse(uint32_t db_id,
1578                              void (*fn)(TDB_DATA key, TDB_DATA data,
1579                                         void *private_data),
1580                              void *private_data)
1581 {
1582         struct ctdbd_connection *conn;
1583         int ret;
1584
1585         become_root();
1586         ret = ctdbd_init_connection(talloc_tos(), lp_ctdbd_socket(),
1587                                     lp_ctdb_timeout(), &conn);
1588         unbecome_root();
1589         if (ret != 0) {
1590                 DBG_WARNING("ctdbd_init_connection failed: %s\n",
1591                             strerror(ret));
1592                 return ret;
1593         }
1594
1595         ret = ctdbd_traverse(conn, db_id, fn, private_data);
1596         TALLOC_FREE(conn);
1597
1598         if (ret != 0) {
1599                 DBG_WARNING("ctdbd_traverse failed: %s\n",
1600                             strerror(ret));
1601                 return ret;
1602         }
1603
1604         return 0;
1605 }
1606
1607
1608 static int db_ctdb_traverse(struct db_context *db,
1609                             int (*fn)(struct db_record *rec,
1610                                       void *private_data),
1611                             void *private_data)
1612 {
1613         int ret;
1614         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1615                                                         struct db_ctdb_ctx);
1616         struct traverse_state state;
1617
1618         state.db = db;
1619         state.fn = fn;
1620         state.private_data = private_data;
1621         state.count = 0;
1622
1623         if (db->persistent) {
1624                 struct tdb_context *ltdb = ctx->wtdb->tdb;
1625
1626                 /* for persistent databases we don't need to do a ctdb traverse,
1627                    we can do a faster local traverse */
1628                 ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
1629                 if (ret < 0) {
1630                         return ret;
1631                 }
1632                 if (ctx->transaction && ctx->transaction->m_write) {
1633                         /*
1634                          * we now have to handle keys not yet
1635                          * present at transaction start
1636                          */
1637                         struct db_context *newkeys = db_open_rbt(talloc_tos());
1638                         struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
1639                         struct ctdb_rec_data_old *rec=NULL;
1640                         uint32_t i;
1641                         int count = 0;
1642                         NTSTATUS status;
1643
1644                         if (newkeys == NULL) {
1645                                 return -1;
1646                         }
1647
1648                         for (i=0; i<mbuf->count; i++) {
1649                                 TDB_DATA key;
1650                                 rec = db_ctdb_marshall_loop_next_key(
1651                                         mbuf, rec, &key);
1652                                 SMB_ASSERT(rec != NULL);
1653
1654                                 if (!tdb_exists(ltdb, key)) {
1655                                         dbwrap_store(newkeys, key, tdb_null, 0);
1656                                 }
1657                         }
1658                         status = dbwrap_traverse(newkeys,
1659                                                  traverse_persistent_callback_dbwrap,
1660                                                  &state,
1661                                                  &count);
1662                         talloc_free(newkeys);
1663                         if (!NT_STATUS_IS_OK(status)) {
1664                                 return -1;
1665                         }
1666                         ret += count;
1667                 }
1668                 return ret;
1669         }
1670
1671         ret = db_ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1672         if (ret != 0) {
1673                 return -1;
1674         }
1675         return state.count;
1676 }
1677
1678 static NTSTATUS db_ctdb_storev_deny(struct db_record *rec,
1679                                     const TDB_DATA *dbufs, int num_dbufs, int flag)
1680 {
1681         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1682 }
1683
1684 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1685 {
1686         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1687 }
1688
1689 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1690 {
1691         struct traverse_state *state = (struct traverse_state *)private_data;
1692         struct db_record rec;
1693
1694         ZERO_STRUCT(rec);
1695         rec.db = state->db;
1696         rec.key = key;
1697         rec.value = data;
1698         rec.storev = db_ctdb_storev_deny;
1699         rec.delete_rec = db_ctdb_delete_deny;
1700         rec.private_data = NULL;
1701         state->fn(&rec, state->private_data);
1702         state->count++;
1703 }
1704
1705 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1706                                         void *private_data)
1707 {
1708         struct traverse_state *state = (struct traverse_state *)private_data;
1709         struct db_record rec;
1710
1711         /*
1712          * Skip the __db_sequence_number__ key:
1713          * This is used for persistent transactions internally.
1714          */
1715         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1716             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1717         {
1718                 return 0;
1719         }
1720
1721         ZERO_STRUCT(rec);
1722         rec.db = state->db;
1723         rec.key = kbuf;
1724         rec.value = dbuf;
1725         rec.storev = db_ctdb_storev_deny;
1726         rec.delete_rec = db_ctdb_delete_deny;
1727         rec.private_data = NULL;
1728
1729         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1730                 /* a deleted record */
1731                 return 0;
1732         }
1733         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1734         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1735
1736         state->count++;
1737         return state->fn(&rec, state->private_data);
1738 }
1739
1740 static int db_ctdb_traverse_read(struct db_context *db,
1741                                  int (*fn)(struct db_record *rec,
1742                                            void *private_data),
1743                                  void *private_data)
1744 {
1745         int ret;
1746         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1747                                                         struct db_ctdb_ctx);
1748         struct traverse_state state;
1749
1750         state.db = db;
1751         state.fn = fn;
1752         state.private_data = private_data;
1753         state.count = 0;
1754
1755         if (db->persistent) {
1756                 /* for persistent databases we don't need to do a ctdb traverse,
1757                    we can do a faster local traverse */
1758                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1759         }
1760
1761         ret = db_ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1762         if (ret != 0) {
1763                 return -1;
1764         }
1765         return state.count;
1766 }
1767
1768 static int db_ctdb_get_seqnum(struct db_context *db)
1769 {
1770         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1771                                                         struct db_ctdb_ctx);
1772         return tdb_get_seqnum(ctx->wtdb->tdb);
1773 }
1774
1775 static size_t db_ctdb_id(struct db_context *db, uint8_t *id, size_t idlen)
1776 {
1777         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1778                 db->private_data, struct db_ctdb_ctx);
1779
1780         if (idlen >= sizeof(ctx->db_id)) {
1781                 memcpy(id, &ctx->db_id, sizeof(ctx->db_id));
1782         }
1783
1784         return sizeof(ctx->db_id);
1785 }
1786
1787 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1788                                 struct messaging_context *msg_ctx,
1789                                 const char *name,
1790                                 int hash_size, int tdb_flags,
1791                                 int open_flags, mode_t mode,
1792                                 enum dbwrap_lock_order lock_order,
1793                                 uint64_t dbwrap_flags)
1794 {
1795         struct db_context *result;
1796         struct db_ctdb_ctx *db_ctdb;
1797         char *db_path;
1798         struct loadparm_context *lp_ctx;
1799         TDB_DATA data;
1800         bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
1801         int32_t cstatus;
1802         int ret;
1803
1804         if (!lp_clustering()) {
1805                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1806                 return NULL;
1807         }
1808
1809         if (!(result = talloc_zero(mem_ctx, struct db_context))) {
1810                 DEBUG(0, ("talloc failed\n"));
1811                 TALLOC_FREE(result);
1812                 return NULL;
1813         }
1814
1815         if (!(db_ctdb = talloc(result, struct db_ctdb_ctx))) {
1816                 DEBUG(0, ("talloc failed\n"));
1817                 TALLOC_FREE(result);
1818                 return NULL;
1819         }
1820
1821         result->name = talloc_strdup(result, name);
1822         if (result->name == NULL) {
1823                 DEBUG(0, ("talloc failed\n"));
1824                 TALLOC_FREE(result);
1825                 return NULL;
1826         }
1827
1828         db_ctdb->transaction = NULL;
1829         db_ctdb->db = result;
1830
1831         ret = ctdbd_db_attach(messaging_ctdb_connection(), name,
1832                               &db_ctdb->db_id, persistent);
1833         if (ret != 0) {
1834                 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
1835                           strerror(ret)));
1836                 TALLOC_FREE(result);
1837                 return NULL;
1838         }
1839
1840         if (tdb_flags & TDB_SEQNUM) {
1841                 data.dptr = (uint8_t *)&db_ctdb->db_id;
1842                 data.dsize = sizeof(db_ctdb->db_id);
1843
1844                 ret = ctdbd_control_local(messaging_ctdb_connection(),
1845                                           CTDB_CONTROL_ENABLE_SEQNUM,
1846                                           0, 0, data,
1847                                           NULL, NULL, &cstatus);
1848                 if ((ret != 0) || cstatus != 0) {
1849                         DBG_ERR("ctdb_control for enable seqnum "
1850                                 "failed: %s\n", strerror(ret));
1851                         TALLOC_FREE(result);
1852                         return NULL;
1853                 }
1854         }
1855
1856         db_path = ctdbd_dbpath(messaging_ctdb_connection(), db_ctdb,
1857                                db_ctdb->db_id);
1858
1859         result->persistent = persistent;
1860         result->lock_order = lock_order;
1861
1862         data.dptr = (uint8_t *)&db_ctdb->db_id;
1863         data.dsize = sizeof(db_ctdb->db_id);
1864
1865         ret = ctdbd_control_local(messaging_ctdb_connection(),
1866                                   CTDB_CONTROL_DB_OPEN_FLAGS,
1867                                   0, 0, data, NULL, &data, &cstatus);
1868         if (ret != 0) {
1869                 DBG_ERR(" ctdb control for db_open_flags "
1870                          "failed: %s\n", strerror(ret));
1871                 TALLOC_FREE(result);
1872                 return NULL;
1873         }
1874
1875         if (cstatus != 0 || data.dsize != sizeof(int)) {
1876                 DBG_ERR("ctdb_control for db_open_flags failed\n");
1877                 TALLOC_FREE(result);
1878                 return NULL;
1879         }
1880
1881         tdb_flags = *(int *)data.dptr;
1882
1883         if (!result->persistent) {
1884                 ret = ctdb_async_ctx_init(NULL, messaging_tevent_context(msg_ctx));
1885                 if (ret != 0) {
1886                         DBG_ERR("ctdb_async_ctx_init failed: %s\n", strerror(ret));
1887                         TALLOC_FREE(result);
1888                         return NULL;
1889                 }
1890         }
1891
1892         if (!result->persistent &&
1893             (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
1894         {
1895                 TDB_DATA indata;
1896
1897                 indata = make_tdb_data((uint8_t *)&db_ctdb->db_id,
1898                                        sizeof(db_ctdb->db_id));
1899
1900                 ret = ctdbd_control_local(
1901                         messaging_ctdb_connection(),
1902                         CTDB_CONTROL_SET_DB_READONLY, 0, 0,
1903                         indata, NULL, NULL, &cstatus);
1904                 if ((ret != 0) || (cstatus != 0)) {
1905                         DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
1906                                   "%s, %"PRIi32"\n", strerror(ret), cstatus));
1907                         TALLOC_FREE(result);
1908                         return NULL;
1909                 }
1910         }
1911
1912         lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
1913
1914         if (hash_size == 0) {
1915                 hash_size = lpcfg_tdb_hash_size(lp_ctx, db_path);
1916         }
1917
1918         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size,
1919                                       lpcfg_tdb_flags(lp_ctx, tdb_flags),
1920                                       O_RDWR, 0);
1921         talloc_unlink(db_path, lp_ctx);
1922         if (db_ctdb->wtdb == NULL) {
1923                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1924                 TALLOC_FREE(result);
1925                 return NULL;
1926         }
1927         talloc_free(db_path);
1928
1929         /* honor permissions if user has specified O_CREAT */
1930         if (open_flags & O_CREAT) {
1931                 int fd;
1932                 fd = tdb_fd(db_ctdb->wtdb->tdb);
1933                 ret = fchmod(fd, mode);
1934                 if (ret == -1) {
1935                         DBG_WARNING("fchmod failed: %s\n",
1936                                     strerror(errno));
1937                         TALLOC_FREE(result);
1938                         return NULL;
1939                 }
1940         }
1941
1942         if (result->persistent) {
1943                 db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb, msg_ctx);
1944                 if (db_ctdb->lock_ctx == NULL) {
1945                         DEBUG(0, ("g_lock_ctx_init failed\n"));
1946                         TALLOC_FREE(result);
1947                         return NULL;
1948                 }
1949         }
1950
1951         db_ctdb->warn_unlock_msecs = lp_parm_int(-1, "ctdb",
1952                                                  "unlock_warn_threshold", 5);
1953         db_ctdb->warn_migrate_attempts = lp_parm_int(-1, "ctdb",
1954                                                      "migrate_attempts", 10);
1955         db_ctdb->warn_migrate_msecs = lp_parm_int(-1, "ctdb",
1956                                                   "migrate_duration", 5000);
1957         db_ctdb->warn_locktime_msecs = lp_ctdb_locktime_warn_threshold();
1958
1959         result->private_data = (void *)db_ctdb;
1960         result->fetch_locked = db_ctdb_fetch_locked;
1961         result->try_fetch_locked = db_ctdb_try_fetch_locked;
1962         result->parse_record = db_ctdb_parse_record;
1963         result->parse_record_send = db_ctdb_parse_record_send;
1964         result->parse_record_recv = db_ctdb_parse_record_recv;
1965         result->traverse = db_ctdb_traverse;
1966         result->traverse_read = db_ctdb_traverse_read;
1967         result->get_seqnum = db_ctdb_get_seqnum;
1968         result->transaction_start = db_ctdb_transaction_start;
1969         result->transaction_commit = db_ctdb_transaction_commit;
1970         result->transaction_cancel = db_ctdb_transaction_cancel;
1971         result->id = db_ctdb_id;
1972
1973         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1974                  name, db_ctdb->db_id));
1975
1976         return result;
1977 }