dbwrap_ctdb: implement parse_record_send()/recv()
[amitay/samba.git] / source3 / lib / dbwrap / dbwrap_ctdb.c
1 /*
2    Unix SMB/CIFS implementation.
3    Database interface wrapper around ctdbd
4    Copyright (C) Volker Lendecke 2007-2009
5    Copyright (C) Michael Adam 2009
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
24 #include "util_tdb.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
29
30 #include "ctdb_private.h"
31 #include "ctdbd_conn.h"
32 #include "dbwrap/dbwrap.h"
33 #include "dbwrap/dbwrap_private.h"
34 #include "dbwrap/dbwrap_ctdb.h"
35 #include "g_lock.h"
36 #include "messages.h"
37 #include "lib/cluster_support.h"
38 #include "lib/util/tevent_ntstatus.h"
39
40 struct db_ctdb_transaction_handle {
41         struct db_ctdb_ctx *ctx;
42         /*
43          * we store the writes done under a transaction:
44          */
45         struct ctdb_marshall_buffer *m_write;
46         uint32_t nesting;
47         bool nested_cancel;
48         char *lock_name;
49 };
50
51 struct db_ctdb_ctx {
52         struct db_context *db;
53         struct ctdbd_connection *conn;
54         struct tdb_wrap *wtdb;
55         uint32_t db_id;
56         struct db_ctdb_transaction_handle *transaction;
57         struct g_lock_ctx *lock_ctx;
58
59         /* thresholds for warning messages */
60         int warn_unlock_msecs;
61         int warn_migrate_msecs;
62         int warn_migrate_attempts;
63         int warn_locktime_msecs;
64 };
65
66 struct db_ctdb_rec {
67         struct db_ctdb_ctx *ctdb_ctx;
68         struct ctdb_ltdb_header header;
69         struct timeval lock_time;
70 };
71
72 struct ctdb_async_ctx {
73         bool initialized;
74         struct ctdbd_connection *async_conn;
75 };
76
77 static struct ctdb_async_ctx ctdb_async_ctx;
78
79 static int ctdb_async_ctx_init_internal(TALLOC_CTX *mem_ctx,
80                                         struct tevent_context *ev,
81                                         bool reinit)
82 {
83         int ret;
84
85         if (reinit) {
86                 TALLOC_FREE(ctdb_async_ctx.async_conn);
87                 ctdb_async_ctx.initialized = false;
88         }
89
90         if (ctdb_async_ctx.initialized) {
91                 return 0;
92         }
93
94         become_root();
95         ret = ctdbd_init_connection(mem_ctx,
96                                     lp_ctdbd_socket(),
97                                     lp_ctdb_timeout(),
98                                     &ctdb_async_ctx.async_conn);
99         unbecome_root();
100
101         if (ctdb_async_ctx.async_conn == NULL) {
102                 DBG_ERR("ctdbd_init_connection failed\n");
103                 return EIO;
104         }
105
106         ret = ctdbd_setup_fde(ctdb_async_ctx.async_conn, ev);
107         if (ret != 0) {
108                 DBG_ERR("ctdbd_setup_ev failed\n");
109                 return ret;
110         }
111
112         return 0;
113 }
114
115 static int ctdb_async_ctx_init(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
116 {
117         return ctdb_async_ctx_init_internal(mem_ctx, ev, false);
118 }
119
120 int ctdb_async_ctx_reinit(TALLOC_CTX *mem_ctx, struct tevent_context *ev)
121 {
122         return ctdb_async_ctx_init_internal(mem_ctx, ev, true);
123 }
124
125 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
126 {
127         enum TDB_ERROR tret = tdb_error(tdb);
128
129         return map_nt_error_from_tdb(tret);
130 }
131
132 struct db_ctdb_ltdb_parse_state {
133         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
134                        TDB_DATA data, void *private_data);
135         void *private_data;
136 };
137
138 static int db_ctdb_ltdb_parser(TDB_DATA key, TDB_DATA data,
139                                void *private_data)
140 {
141         struct db_ctdb_ltdb_parse_state *state =
142                 (struct db_ctdb_ltdb_parse_state *)private_data;
143
144         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
145                 return -1;
146         }
147
148         state->parser(
149                 key, (struct ctdb_ltdb_header *)data.dptr,
150                 make_tdb_data(data.dptr + sizeof(struct ctdb_ltdb_header),
151                               data.dsize - sizeof(struct ctdb_ltdb_header)),
152                 state->private_data);
153         return 0;
154 }
155
156 static NTSTATUS db_ctdb_ltdb_parse(
157         struct db_ctdb_ctx *db, TDB_DATA key,
158         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
159                        TDB_DATA data, void *private_data),
160         void *private_data)
161 {
162         struct db_ctdb_ltdb_parse_state state;
163         int ret;
164
165         state.parser = parser;
166         state.private_data = private_data;
167
168         ret = tdb_parse_record(db->wtdb->tdb, key, db_ctdb_ltdb_parser,
169                                &state);
170         if (ret == -1) {
171                 return NT_STATUS_NOT_FOUND;
172         }
173         return NT_STATUS_OK;
174 }
175
176 /*
177  * Store a record together with the ctdb record header
178  * in the local copy of the database.
179  */
180 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
181                                    TDB_DATA key,
182                                    struct ctdb_ltdb_header *header,
183                                    TDB_DATA data)
184 {
185         TDB_DATA recs[2];
186         int ret;
187
188         recs[0] = (TDB_DATA) { .dptr = (uint8_t *)header,
189                                .dsize = sizeof(struct ctdb_ltdb_header) };
190         recs[1] = data;
191
192         ret = tdb_storev(db->wtdb->tdb, key, recs, 2, TDB_REPLACE);
193
194         return (ret == 0) ? NT_STATUS_OK
195                           : tdb_error_to_ntstatus(db->wtdb->tdb);
196
197 }
198
199 /*
200   form a ctdb_rec_data record from a key/data pair
201  */
202 static struct ctdb_rec_data_old *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
203                                                   TDB_DATA key,
204                                                   struct ctdb_ltdb_header *header,
205                                                   TDB_DATA data)
206 {
207         size_t length;
208         struct ctdb_rec_data_old *d;
209
210         length = offsetof(struct ctdb_rec_data_old, data) + key.dsize +
211                 data.dsize + sizeof(*header);
212         d = (struct ctdb_rec_data_old *)talloc_size(mem_ctx, length);
213         if (d == NULL) {
214                 return NULL;
215         }
216         d->length = length;
217         d->reqid = reqid;
218         d->keylen = key.dsize;
219         memcpy(&d->data[0], key.dptr, key.dsize);
220
221         d->datalen = data.dsize + sizeof(*header);
222         memcpy(&d->data[key.dsize], header, sizeof(*header));
223         memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
224         return d;
225 }
226
227
228 /* helper function for marshalling multiple records */
229 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
230                                                struct ctdb_marshall_buffer *m,
231                                                uint32_t db_id,
232                                                uint32_t reqid,
233                                                TDB_DATA key,
234                                                struct ctdb_ltdb_header *header,
235                                                TDB_DATA data)
236 {
237         struct ctdb_rec_data_old *r;
238         size_t m_size, r_size;
239         struct ctdb_marshall_buffer *m2 = NULL;
240
241         r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
242         if (r == NULL) {
243                 talloc_free(m);
244                 return NULL;
245         }
246
247         if (m == NULL) {
248                 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
249                         mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
250                 if (m == NULL) {
251                         goto done;
252                 }
253                 m->db_id = db_id;
254         }
255
256         m_size = talloc_get_size(m);
257         r_size = talloc_get_size(r);
258
259         m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
260                 mem_ctx, m,  m_size + r_size);
261         if (m2 == NULL) {
262                 talloc_free(m);
263                 goto done;
264         }
265
266         memcpy(m_size + (uint8_t *)m2, r, r_size);
267
268         m2->count++;
269
270 done:
271         talloc_free(r);
272         return m2;
273 }
274
275 /* we've finished marshalling, return a data blob with the marshalled records */
276 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
277 {
278         TDB_DATA data;
279         data.dptr = (uint8_t *)m;
280         data.dsize = talloc_get_size(m);
281         return data;
282 }
283
284 /*
285    loop over a marshalling buffer
286
287      - pass r==NULL to start
288      - loop the number of times indicated by m->count
289 */
290 static struct ctdb_rec_data_old *db_ctdb_marshall_loop_next_key(
291         struct ctdb_marshall_buffer *m, struct ctdb_rec_data_old *r, TDB_DATA *key)
292 {
293         if (r == NULL) {
294                 r = (struct ctdb_rec_data_old *)&m->data[0];
295         } else {
296                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
297         }
298
299         key->dptr   = &r->data[0];
300         key->dsize  = r->keylen;
301         return r;
302 }
303
304 static bool db_ctdb_marshall_buf_parse(
305         struct ctdb_rec_data_old *r, uint32_t *reqid,
306         struct ctdb_ltdb_header **header, TDB_DATA *data)
307 {
308         if (r->datalen < sizeof(struct ctdb_ltdb_header)) {
309                 return false;
310         }
311
312         *reqid = r->reqid;
313
314         data->dptr  = &r->data[r->keylen] + sizeof(struct ctdb_ltdb_header);
315         data->dsize = r->datalen - sizeof(struct ctdb_ltdb_header);
316
317         *header = (struct ctdb_ltdb_header *)&r->data[r->keylen];
318
319         return true;
320 }
321
322 /**
323  * CTDB transaction destructor
324  */
325 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
326 {
327         NTSTATUS status;
328
329         status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
330         if (!NT_STATUS_IS_OK(status)) {
331                 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h->lock_name,
332                           nt_errstr(status)));
333                 return -1;
334         }
335         return 0;
336 }
337
338 /**
339  * CTDB dbwrap API: transaction_start function
340  * starts a transaction on a persistent database
341  */
342 static int db_ctdb_transaction_start(struct db_context *db)
343 {
344         struct db_ctdb_transaction_handle *h;
345         NTSTATUS status;
346         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
347                                                         struct db_ctdb_ctx);
348
349         if (!db->persistent) {
350                 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n", 
351                          ctx->db_id));
352                 return -1;
353         }
354
355         if (ctx->transaction) {
356                 ctx->transaction->nesting++;
357                 DEBUG(5, (__location__ " transaction start on db 0x%08x: nesting %d -> %d\n",
358                           ctx->db_id, ctx->transaction->nesting - 1, ctx->transaction->nesting));
359                 return 0;
360         }
361
362         h = talloc_zero(db, struct db_ctdb_transaction_handle);
363         if (h == NULL) {
364                 DEBUG(0,(__location__ " oom for transaction handle\n"));
365                 return -1;
366         }
367
368         h->ctx = ctx;
369
370         h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
371                                        (unsigned int)ctx->db_id);
372         if (h->lock_name == NULL) {
373                 DEBUG(0, ("talloc_asprintf failed\n"));
374                 TALLOC_FREE(h);
375                 return -1;
376         }
377
378         /*
379          * Wait a day, i.e. forever...
380          */
381         status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE,
382                              timeval_set(86400, 0));
383         if (!NT_STATUS_IS_OK(status)) {
384                 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status)));
385                 TALLOC_FREE(h);
386                 return -1;
387         }
388
389         talloc_set_destructor(h, db_ctdb_transaction_destructor);
390
391         ctx->transaction = h;
392
393         DEBUG(5,(__location__ " transaction started on db 0x%08x\n", ctx->db_id));
394
395         return 0;
396 }
397
398 static bool parse_newest_in_marshall_buffer(
399         struct ctdb_marshall_buffer *buf, TDB_DATA key,
400         void (*parser)(TDB_DATA key, struct ctdb_ltdb_header *header,
401                        TDB_DATA data, void *private_data),
402         void *private_data)
403 {
404         struct ctdb_rec_data_old *rec = NULL;
405         struct ctdb_ltdb_header *h = NULL;
406         TDB_DATA data;
407         uint32_t i;
408
409         if (buf == NULL) {
410                 return false;
411         }
412
413         /*
414          * Walk the list of records written during this
415          * transaction. If we want to read one we have already
416          * written, return the last written sample. Thus we do not do
417          * a "break;" for the first hit, this record might have been
418          * overwritten later.
419          */
420
421         for (i=0; i<buf->count; i++) {
422                 TDB_DATA tkey;
423                 uint32_t reqid;
424
425                 rec = db_ctdb_marshall_loop_next_key(buf, rec, &tkey);
426                 if (rec == NULL) {
427                         return false;
428                 }
429
430                 if (!tdb_data_equal(key, tkey)) {
431                         continue;
432                 }
433
434                 if (!db_ctdb_marshall_buf_parse(rec, &reqid, &h, &data)) {
435                         return false;
436                 }
437         }
438
439         if (h == NULL) {
440                 return false;
441         }
442
443         parser(key, h, data, private_data);
444
445         return true;
446 }
447
448 struct pull_newest_from_marshall_buffer_state {
449         struct ctdb_ltdb_header *pheader;
450         TALLOC_CTX *mem_ctx;
451         TDB_DATA *pdata;
452 };
453
454 static void pull_newest_from_marshall_buffer_parser(
455         TDB_DATA key, struct ctdb_ltdb_header *header,
456         TDB_DATA data, void *private_data)
457 {
458         struct pull_newest_from_marshall_buffer_state *state =
459                 (struct pull_newest_from_marshall_buffer_state *)private_data;
460
461         if (state->pheader != NULL) {
462                 memcpy(state->pheader, header, sizeof(*state->pheader));
463         }
464         if (state->pdata != NULL) {
465                 state->pdata->dsize = data.dsize;
466                 state->pdata->dptr = (uint8_t *)talloc_memdup(
467                         state->mem_ctx, data.dptr, data.dsize);
468         }
469 }
470
471 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
472                                              TDB_DATA key,
473                                              struct ctdb_ltdb_header *pheader,
474                                              TALLOC_CTX *mem_ctx,
475                                              TDB_DATA *pdata)
476 {
477         struct pull_newest_from_marshall_buffer_state state;
478
479         state.pheader = pheader;
480         state.mem_ctx = mem_ctx;
481         state.pdata = pdata;
482
483         if (!parse_newest_in_marshall_buffer(
484                     buf, key, pull_newest_from_marshall_buffer_parser,
485                     &state)) {
486                 return false;
487         }
488         if ((pdata != NULL) && (pdata->dsize != 0) && (pdata->dptr == NULL)) {
489                 /* ENOMEM */
490                 return false;
491         }
492         return true;
493 }
494
495 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
496 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
497
498 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
499                                                           TALLOC_CTX *mem_ctx,
500                                                           TDB_DATA key)
501 {
502         struct db_record *result;
503         TDB_DATA ctdb_data;
504
505         if (!(result = talloc(mem_ctx, struct db_record))) {
506                 DEBUG(0, ("talloc failed\n"));
507                 return NULL;
508         }
509
510         result->db = ctx->db;
511         result->private_data = ctx->transaction;
512
513         result->key.dsize = key.dsize;
514         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
515                                                     key.dsize);
516         if (result->key.dptr == NULL) {
517                 DEBUG(0, ("talloc failed\n"));
518                 TALLOC_FREE(result);
519                 return NULL;
520         }
521
522         result->store = db_ctdb_store_transaction;
523         result->delete_rec = db_ctdb_delete_transaction;
524
525         if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key,
526                                              NULL, result, &result->value)) {
527                 return result;
528         }
529
530         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
531         if (ctdb_data.dptr == NULL) {
532                 /* create the record */
533                 result->value = tdb_null;
534                 return result;
535         }
536
537         result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
538         result->value.dptr = NULL;
539
540         if ((result->value.dsize != 0)
541             && !(result->value.dptr = (uint8_t *)talloc_memdup(
542                          result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
543                          result->value.dsize))) {
544                 DEBUG(0, ("talloc failed\n"));
545                 TALLOC_FREE(result);
546         }
547
548         SAFE_FREE(ctdb_data.dptr);
549
550         return result;
551 }
552
553 static int db_ctdb_record_destructor(struct db_record **recp)
554 {
555         struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
556         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
557                 rec->private_data, struct db_ctdb_transaction_handle);
558         int ret = h->ctx->db->transaction_commit(h->ctx->db);
559         if (ret != 0) {
560                 DEBUG(0,(__location__ " transaction_commit failed\n"));
561         }
562         return 0;
563 }
564
565 /*
566   auto-create a transaction for persistent databases
567  */
568 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
569                                                          TALLOC_CTX *mem_ctx,
570                                                          TDB_DATA key)
571 {
572         int res;
573         struct db_record *rec, **recp;
574
575         res = db_ctdb_transaction_start(ctx->db);
576         if (res == -1) {
577                 return NULL;
578         }
579
580         rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
581         if (rec == NULL) {
582                 ctx->db->transaction_cancel(ctx->db);
583                 return NULL;
584         }
585
586         /* destroy this transaction when we release the lock */
587         recp = talloc(rec, struct db_record *);
588         if (recp == NULL) {
589                 ctx->db->transaction_cancel(ctx->db);
590                 talloc_free(rec);
591                 return NULL;
592         }
593         *recp = rec;
594         talloc_set_destructor(recp, db_ctdb_record_destructor);
595         return rec;
596 }
597
598
599 /*
600   stores a record inside a transaction
601  */
602 static NTSTATUS db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
603                                           TDB_DATA key, TDB_DATA data)
604 {
605         TALLOC_CTX *tmp_ctx = talloc_new(h);
606         TDB_DATA rec;
607         struct ctdb_ltdb_header header;
608
609         ZERO_STRUCT(header);
610
611         /* we need the header so we can update the RSN */
612
613         if (!pull_newest_from_marshall_buffer(h->m_write, key, &header,
614                                               NULL, NULL)) {
615
616                 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
617
618                 if (rec.dptr != NULL) {
619                         memcpy(&header, rec.dptr,
620                                sizeof(struct ctdb_ltdb_header));
621                         rec.dsize -= sizeof(struct ctdb_ltdb_header);
622
623                         /*
624                          * a special case, we are writing the same
625                          * data that is there now
626                          */
627                         if (data.dsize == rec.dsize &&
628                             memcmp(data.dptr,
629                                    rec.dptr + sizeof(struct ctdb_ltdb_header),
630                                    data.dsize) == 0) {
631                                 SAFE_FREE(rec.dptr);
632                                 talloc_free(tmp_ctx);
633                                 return NT_STATUS_OK;
634                         }
635                 }
636                 SAFE_FREE(rec.dptr);
637         }
638
639         header.dmaster = ctdbd_vnn(h->ctx->conn);
640         header.rsn++;
641
642         h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
643         if (h->m_write == NULL) {
644                 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
645                 talloc_free(tmp_ctx);
646                 return NT_STATUS_NO_MEMORY;
647         }
648
649         talloc_free(tmp_ctx);
650         return NT_STATUS_OK;
651 }
652
653
654 /* 
655    a record store inside a transaction
656  */
657 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
658 {
659         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
660                 rec->private_data, struct db_ctdb_transaction_handle);
661         NTSTATUS status;
662
663         status = db_ctdb_transaction_store(h, rec->key, data);
664         return status;
665 }
666
667 /*
668    a record delete inside a transaction
669  */
670 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
671 {
672         struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
673                 rec->private_data, struct db_ctdb_transaction_handle);
674         NTSTATUS status;
675
676         status =  db_ctdb_transaction_store(h, rec->key, tdb_null);
677         return status;
678 }
679
680 static void db_ctdb_fetch_db_seqnum_parser(
681         TDB_DATA key, struct ctdb_ltdb_header *header,
682         TDB_DATA data, void *private_data)
683 {
684         uint64_t *seqnum = (uint64_t *)private_data;
685
686         if (data.dsize != sizeof(uint64_t)) {
687                 *seqnum = 0;
688                 return;
689         }
690         memcpy(seqnum, data.dptr, sizeof(*seqnum));
691 }
692
693 /**
694  * Fetch the db sequence number of a persistent db directly from the db.
695  */
696 static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
697                                                 uint64_t *seqnum)
698 {
699         NTSTATUS status;
700         TDB_DATA key;
701
702         if (seqnum == NULL) {
703                 return NT_STATUS_INVALID_PARAMETER;
704         }
705
706         key = string_term_tdb_data(CTDB_DB_SEQNUM_KEY);
707
708         status = db_ctdb_ltdb_parse(
709                 db, key, db_ctdb_fetch_db_seqnum_parser, seqnum);
710
711         if (NT_STATUS_IS_OK(status)) {
712                 return NT_STATUS_OK;
713         }
714         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
715                 *seqnum = 0;
716                 return NT_STATUS_OK;
717         }
718         return status;
719 }
720
721 /**
722  * Store the database sequence number inside a transaction.
723  */
724 static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h,
725                                         uint64_t seqnum)
726 {
727         NTSTATUS status;
728         const char *keyname = CTDB_DB_SEQNUM_KEY;
729         TDB_DATA key;
730         TDB_DATA data;
731
732         key = string_term_tdb_data(keyname);
733
734         data.dptr = (uint8_t *)&seqnum;
735         data.dsize = sizeof(uint64_t);
736
737         status = db_ctdb_transaction_store(h, key, data);
738
739         return status;
740 }
741
742 /*
743   commit a transaction
744  */
745 static int db_ctdb_transaction_commit(struct db_context *db)
746 {
747         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
748                                                         struct db_ctdb_ctx);
749         NTSTATUS rets;
750         int32_t status;
751         struct db_ctdb_transaction_handle *h = ctx->transaction;
752         uint64_t old_seqnum, new_seqnum;
753         int ret;
754
755         if (h == NULL) {
756                 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
757                 return -1;
758         }
759
760         if (h->nested_cancel) {
761                 db->transaction_cancel(db);
762                 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
763                 return -1;
764         }
765
766         if (h->nesting != 0) {
767                 h->nesting--;
768                 DEBUG(5, (__location__ " transaction commit on db 0x%08x: nesting %d -> %d\n",
769                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
770                 return 0;
771         }
772
773         if (h->m_write == NULL) {
774                 /*
775                  * No changes were made, so don't change the seqnum,
776                  * don't push to other node, just exit with success.
777                  */
778                 ret = 0;
779                 goto done;
780         }
781
782         DEBUG(5,(__location__ " transaction commit on db 0x%08x\n", ctx->db_id));
783
784         /*
785          * As the last db action before committing, bump the database sequence
786          * number. Note that this undoes all changes to the seqnum records
787          * performed under the transaction. This record is not meant to be
788          * modified by user interaction. It is for internal use only...
789          */
790         rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum);
791         if (!NT_STATUS_IS_OK(rets)) {
792                 DEBUG(1, (__location__ " failed to fetch the db sequence number "
793                           "in transaction commit on db 0x%08x\n", ctx->db_id));
794                 ret = -1;
795                 goto done;
796         }
797
798         new_seqnum = old_seqnum + 1;
799
800         rets = db_ctdb_store_db_seqnum(h, new_seqnum);
801         if (!NT_STATUS_IS_OK(rets)) {
802                 DEBUG(1, (__location__ "failed to store the db sequence number "
803                           " in transaction commit on db 0x%08x\n", ctx->db_id));
804                 ret = -1;
805                 goto done;
806         }
807
808 again:
809         /* tell ctdbd to commit to the other nodes */
810         ret = ctdbd_control_local(ctx->conn, CTDB_CONTROL_TRANS3_COMMIT,
811                                   h->ctx->db_id, 0,
812                                   db_ctdb_marshall_finish(h->m_write),
813                                   NULL, NULL, &status);
814         if ((ret != 0) || status != 0) {
815                 /*
816                  * The TRANS3_COMMIT control should only possibly fail when a
817                  * recovery has been running concurrently. In any case, the db
818                  * will be the same on all nodes, either the new copy or the
819                  * old copy.  This can be detected by comparing the old and new
820                  * local sequence numbers.
821                  */
822                 rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum);
823                 if (!NT_STATUS_IS_OK(rets)) {
824                         DEBUG(1, (__location__ " failed to refetch db sequence "
825                                   "number after failed TRANS3_COMMIT\n"));
826                         ret = -1;
827                         goto done;
828                 }
829
830                 if (new_seqnum == old_seqnum) {
831                         /* Recovery prevented all our changes: retry. */
832                         goto again;
833                 }
834                 if (new_seqnum != (old_seqnum + 1)) {
835                         DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
836                                   "old_seqnum[%lu] + (0 or 1) after failed "
837                                   "TRANS3_COMMIT - this should not happen!\n",
838                                   (unsigned long)new_seqnum,
839                                   (unsigned long)old_seqnum));
840                         ret = -1;
841                         goto done;
842                 }
843                 /*
844                  * Recovery propagated our changes to all nodes, completing
845                  * our commit for us - succeed.
846                  */
847         }
848
849         ret = 0;
850
851 done:
852         h->ctx->transaction = NULL;
853         talloc_free(h);
854         return ret;
855 }
856
857
858 /*
859   cancel a transaction
860  */
861 static int db_ctdb_transaction_cancel(struct db_context *db)
862 {
863         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
864                                                         struct db_ctdb_ctx);
865         struct db_ctdb_transaction_handle *h = ctx->transaction;
866
867         if (h == NULL) {
868                 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
869                 return -1;
870         }
871
872         if (h->nesting != 0) {
873                 h->nesting--;
874                 h->nested_cancel = true;
875                 DEBUG(5, (__location__ " transaction cancel on db 0x%08x: nesting %d -> %d\n",
876                           ctx->db_id, ctx->transaction->nesting + 1, ctx->transaction->nesting));
877                 return 0;
878         }
879
880         DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
881
882         ctx->transaction = NULL;
883         talloc_free(h);
884         return 0;
885 }
886
887
888 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
889 {
890         struct db_ctdb_rec *crec = talloc_get_type_abort(
891                 rec->private_data, struct db_ctdb_rec);
892
893         return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
894 }
895
896
897
898 static NTSTATUS db_ctdb_send_schedule_for_deletion(struct db_record *rec)
899 {
900         NTSTATUS status = NT_STATUS_OK;
901         int ret;
902         struct ctdb_control_schedule_for_deletion *dd;
903         TDB_DATA indata;
904         int32_t cstatus;
905         struct db_ctdb_rec *crec = talloc_get_type_abort(
906                 rec->private_data, struct db_ctdb_rec);
907         struct db_ctdb_ctx *ctx = crec->ctdb_ctx;
908
909         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + rec->key.dsize;
910         indata.dptr = talloc_zero_array(crec, uint8_t, indata.dsize);
911         if (indata.dptr == NULL) {
912                 DEBUG(0, (__location__ " talloc failed!\n"));
913                 return NT_STATUS_NO_MEMORY;
914         }
915
916         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
917         dd->db_id = ctx->db_id;
918         dd->hdr = crec->header;
919         dd->keylen = rec->key.dsize;
920         memcpy(dd->key, rec->key.dptr, rec->key.dsize);
921
922         ret = ctdbd_control_local(ctx->conn,
923                                   CTDB_CONTROL_SCHEDULE_FOR_DELETION,
924                                   crec->ctdb_ctx->db_id,
925                                   CTDB_CTRL_FLAG_NOREPLY, /* flags */
926                                   indata,
927                                   NULL, /* outdata */
928                                   NULL, /* errmsg */
929                                   &cstatus);
930         talloc_free(indata.dptr);
931
932         if ((ret != 0) || cstatus != 0) {
933                 DEBUG(1, (__location__ " Error sending local control "
934                           "SCHEDULE_FOR_DELETION: %s, cstatus = %"PRIi32"\n",
935                           strerror(ret), cstatus));
936                 if (ret != 0) {
937                         status = map_nt_error_from_unix(ret);
938                 } else {
939                         status = NT_STATUS_UNSUCCESSFUL;
940                 }
941         }
942
943         return status;
944 }
945
946 static NTSTATUS db_ctdb_delete(struct db_record *rec)
947 {
948         NTSTATUS status;
949
950         /*
951          * We have to store the header with empty data. TODO: Fix the
952          * tdb-level cleanup
953          */
954
955         status = db_ctdb_store(rec, tdb_null, 0);
956         if (!NT_STATUS_IS_OK(status)) {
957                 return status;
958         }
959
960         status = db_ctdb_send_schedule_for_deletion(rec);
961         return status;
962 }
963
964 static int db_ctdb_record_destr(struct db_record* data)
965 {
966         struct db_ctdb_rec *crec = talloc_get_type_abort(
967                 data->private_data, struct db_ctdb_rec);
968         int threshold;
969         int ret;
970         struct timeval before;
971         double timediff;
972
973         DEBUG(10, (DEBUGLEVEL > 10
974                    ? "Unlocking db %u key %s\n"
975                    : "Unlocking db %u key %.20s\n",
976                    (int)crec->ctdb_ctx->db_id,
977                    hex_encode_talloc(data, (unsigned char *)data->key.dptr,
978                               data->key.dsize)));
979
980         before = timeval_current();
981
982         ret = tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key);
983
984         timediff = timeval_elapsed(&before);
985         timediff *= 1000;       /* get us milliseconds */
986
987         if (timediff > crec->ctdb_ctx->warn_unlock_msecs) {
988                 char *key;
989                 key = hex_encode_talloc(talloc_tos(),
990                                         (unsigned char *)data->key.dptr,
991                                         data->key.dsize);
992                 DEBUG(0, ("tdb_chainunlock on db %s, key %s took %f milliseconds\n",
993                           tdb_name(crec->ctdb_ctx->wtdb->tdb), key,
994                           timediff));
995                 TALLOC_FREE(key);
996         }
997
998         if (ret != 0) {
999                 DEBUG(0, ("tdb_chainunlock failed\n"));
1000                 return -1;
1001         }
1002
1003         threshold = crec->ctdb_ctx->warn_locktime_msecs;
1004         if (threshold != 0) {
1005                 timediff = timeval_elapsed(&crec->lock_time) * 1000;
1006                 if (timediff > threshold) {
1007                         const char *key;
1008
1009                         key = hex_encode_talloc(data,
1010                                                 (unsigned char *)data->key.dptr,
1011                                                 data->key.dsize);
1012                         DEBUG(0, ("Held tdb lock on db %s, key %s "
1013                                   "%f milliseconds\n",
1014                                   tdb_name(crec->ctdb_ctx->wtdb->tdb),
1015                                   key, timediff));
1016                 }
1017         }
1018
1019         return 0;
1020 }
1021
1022 /**
1023  * Check whether we have a valid local copy of the given record,
1024  * either for reading or for writing.
1025  */
1026 static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header *hdr,
1027                                       uint32_t my_vnn, bool read_only)
1028 {
1029         if (hdr->dmaster != my_vnn) {
1030                 /* If we're not dmaster, it must be r/o copy. */
1031                 return read_only && (hdr->flags & CTDB_REC_RO_HAVE_READONLY);
1032         }
1033
1034         /*
1035          * If we want write access, no one may have r/o copies.
1036          */
1037         return read_only || !(hdr->flags & CTDB_REC_RO_HAVE_DELEGATIONS);
1038 }
1039
1040 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data, uint32_t my_vnn,
1041                                        bool read_only)
1042 {
1043         if (ctdb_data.dptr == NULL) {
1044                 return false;
1045         }
1046
1047         if (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) {
1048                 return false;
1049         }
1050
1051         return db_ctdb_can_use_local_hdr(
1052                 (struct ctdb_ltdb_header *)ctdb_data.dptr, my_vnn, read_only);
1053 }
1054
1055 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
1056                                                TALLOC_CTX *mem_ctx,
1057                                                TDB_DATA key,
1058                                                bool tryonly)
1059 {
1060         struct db_record *result;
1061         struct db_ctdb_rec *crec;
1062         TDB_DATA ctdb_data;
1063         int migrate_attempts;
1064         struct timeval migrate_start;
1065         struct timeval chainlock_start;
1066         struct timeval ctdb_start_time;
1067         double chainlock_time = 0;
1068         double ctdb_time = 0;
1069         int duration_msecs;
1070         int lockret;
1071         int ret;
1072
1073         if (!(result = talloc(mem_ctx, struct db_record))) {
1074                 DEBUG(0, ("talloc failed\n"));
1075                 return NULL;
1076         }
1077
1078         if (!(crec = talloc_zero(result, struct db_ctdb_rec))) {
1079                 DEBUG(0, ("talloc failed\n"));
1080                 TALLOC_FREE(result);
1081                 return NULL;
1082         }
1083
1084         result->db = ctx->db;
1085         result->private_data = (void *)crec;
1086         crec->ctdb_ctx = ctx;
1087
1088         result->key.dsize = key.dsize;
1089         result->key.dptr = (uint8_t *)talloc_memdup(result, key.dptr,
1090                                                     key.dsize);
1091         if (result->key.dptr == NULL) {
1092                 DEBUG(0, ("talloc failed\n"));
1093                 TALLOC_FREE(result);
1094                 return NULL;
1095         }
1096
1097         migrate_attempts = 0;
1098         GetTimeOfDay(&migrate_start);
1099
1100         /*
1101          * Do a blocking lock on the record
1102          */
1103 again:
1104
1105         if (DEBUGLEVEL >= 10) {
1106                 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
1107                 DEBUG(10, (DEBUGLEVEL > 10
1108                            ? "Locking db %u key %s\n"
1109                            : "Locking db %u key %.20s\n",
1110                            (int)crec->ctdb_ctx->db_id, keystr));
1111                 TALLOC_FREE(keystr);
1112         }
1113
1114         GetTimeOfDay(&chainlock_start);
1115         lockret = tryonly
1116                 ? tdb_chainlock_nonblock(ctx->wtdb->tdb, key)
1117                 : tdb_chainlock(ctx->wtdb->tdb, key);
1118         chainlock_time += timeval_elapsed(&chainlock_start);
1119
1120         if (lockret != 0) {
1121                 DEBUG(3, ("tdb_chainlock failed\n"));
1122                 TALLOC_FREE(result);
1123                 return NULL;
1124         }
1125
1126         result->store = db_ctdb_store;
1127         result->delete_rec = db_ctdb_delete;
1128         talloc_set_destructor(result, db_ctdb_record_destr);
1129
1130         ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1131
1132         /*
1133          * See if we have a valid record and we are the dmaster. If so, we can
1134          * take the shortcut and just return it.
1135          */
1136
1137         if (!db_ctdb_can_use_local_copy(ctdb_data, ctdbd_vnn(ctx->conn),
1138                                         false)) {
1139                 SAFE_FREE(ctdb_data.dptr);
1140                 tdb_chainunlock(ctx->wtdb->tdb, key);
1141                 talloc_set_destructor(result, NULL);
1142
1143                 if (tryonly && (migrate_attempts != 0)) {
1144                         DEBUG(5, ("record migrated away again\n"));
1145                         TALLOC_FREE(result);
1146                         return NULL;
1147                 }
1148
1149                 migrate_attempts += 1;
1150
1151                 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %"PRIu32" "
1152                            "(%"PRIu32") %"PRIu32"\n",
1153                            ctdb_data.dptr, ctdb_data.dptr ?
1154                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster :
1155                            UINT32_MAX,
1156                            ctdbd_vnn(ctx->conn),
1157                            ctdb_data.dptr ?
1158                            ((struct ctdb_ltdb_header *)ctdb_data.dptr)->flags : 0));
1159
1160                 GetTimeOfDay(&ctdb_start_time);
1161                 ret = ctdbd_migrate(ctx->conn, ctx->db_id, key);
1162                 ctdb_time += timeval_elapsed(&ctdb_start_time);
1163
1164                 if (ret != 0) {
1165                         DEBUG(5, ("ctdb_migrate failed: %s\n",
1166                                   strerror(ret)));
1167                         TALLOC_FREE(result);
1168                         return NULL;
1169                 }
1170                 /* now its migrated, try again */
1171                 goto again;
1172         }
1173
1174         {
1175                 double duration;
1176                 duration = timeval_elapsed(&migrate_start);
1177
1178                 /*
1179                  * Convert the duration to milliseconds to avoid a
1180                  * floating-point division of
1181                  * lp_parm_int("migrate_duration") by 1000.
1182                  */
1183                 duration_msecs = duration * 1000;
1184         }
1185
1186         if ((migrate_attempts > ctx->warn_migrate_attempts) ||
1187             (duration_msecs > ctx->warn_migrate_msecs)) {
1188                 int chain = 0;
1189
1190                 if (tdb_get_flags(ctx->wtdb->tdb) & TDB_INCOMPATIBLE_HASH) {
1191                         chain = tdb_jenkins_hash(&key) %
1192                                 tdb_hash_size(ctx->wtdb->tdb);
1193                 }
1194
1195                 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
1196                           "needed %d attempts, %d milliseconds, "
1197                           "chainlock: %f ms, CTDB %f ms\n",
1198                           tdb_name(ctx->wtdb->tdb),
1199                           hex_encode_talloc(talloc_tos(),
1200                                             (unsigned char *)key.dptr,
1201                                             key.dsize),
1202                           chain,
1203                           migrate_attempts, duration_msecs,
1204                           chainlock_time * 1000.0,
1205                           ctdb_time * 1000.0));
1206         }
1207
1208         GetTimeOfDay(&crec->lock_time);
1209
1210         memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
1211
1212         result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
1213         result->value.dptr = NULL;
1214
1215         if ((result->value.dsize != 0)
1216             && !(result->value.dptr = (uint8_t *)talloc_memdup(
1217                          result, ctdb_data.dptr + sizeof(crec->header),
1218                          result->value.dsize))) {
1219                 DEBUG(0, ("talloc failed\n"));
1220                 TALLOC_FREE(result);
1221         }
1222
1223         SAFE_FREE(ctdb_data.dptr);
1224
1225         return result;
1226 }
1227
1228 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
1229                                               TALLOC_CTX *mem_ctx,
1230                                               TDB_DATA key)
1231 {
1232         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1233                                                         struct db_ctdb_ctx);
1234
1235         if (ctx->transaction != NULL) {
1236                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1237         }
1238
1239         if (db->persistent) {
1240                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1241         }
1242
1243         return fetch_locked_internal(ctx, mem_ctx, key, false);
1244 }
1245
1246 static struct db_record *db_ctdb_try_fetch_locked(struct db_context *db,
1247                                                   TALLOC_CTX *mem_ctx,
1248                                                   TDB_DATA key)
1249 {
1250         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1251                                                         struct db_ctdb_ctx);
1252
1253         if (ctx->transaction != NULL) {
1254                 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
1255         }
1256
1257         if (db->persistent) {
1258                 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
1259         }
1260
1261         return fetch_locked_internal(ctx, mem_ctx, key, true);
1262 }
1263
1264 struct db_ctdb_parse_record_state {
1265         void (*parser)(TDB_DATA key, TDB_DATA data, void *private_data);
1266         void *private_data;
1267         uint32_t my_vnn;
1268         bool ask_for_readonly_copy;
1269         bool done;
1270         bool empty_record;
1271 };
1272
1273 static void db_ctdb_parse_record_parser(
1274         TDB_DATA key, struct ctdb_ltdb_header *header,
1275         TDB_DATA data, void *private_data)
1276 {
1277         struct db_ctdb_parse_record_state *state =
1278                 (struct db_ctdb_parse_record_state *)private_data;
1279         state->parser(key, data, state->private_data);
1280 }
1281
1282 static void db_ctdb_parse_record_parser_nonpersistent(
1283         TDB_DATA key, struct ctdb_ltdb_header *header,
1284         TDB_DATA data, void *private_data)
1285 {
1286         struct db_ctdb_parse_record_state *state =
1287                 (struct db_ctdb_parse_record_state *)private_data;
1288
1289         if (db_ctdb_can_use_local_hdr(header, state->my_vnn, true)) {
1290                 /*
1291                  * A record consisting only of the ctdb header can be
1292                  * a validly created empty record or a tombstone
1293                  * record of a deleted record (not vacuumed yet). Mark
1294                  * it accordingly.
1295                  */
1296                 state->empty_record = (data.dsize == 0);
1297                 if (!state->empty_record) {
1298                         state->parser(key, data, state->private_data);
1299                 }
1300                 state->done = true;
1301         } else {
1302                 /*
1303                  * We found something in the db, so it seems that this record,
1304                  * while not usable locally right now, is popular. Ask for a
1305                  * R/O copy.
1306                  */
1307                 state->ask_for_readonly_copy = true;
1308         }
1309 }
1310
1311 static NTSTATUS db_ctdb_try_parse_local_record(struct db_ctdb_ctx *ctx,
1312                                                TDB_DATA key,
1313                                                struct db_ctdb_parse_record_state *state)
1314 {
1315         NTSTATUS status;
1316
1317         if (ctx->transaction != NULL) {
1318                 struct db_ctdb_transaction_handle *h = ctx->transaction;
1319                 bool found;
1320
1321                 /*
1322                  * Transactions only happen for persistent db's.
1323                  */
1324
1325                 found = parse_newest_in_marshall_buffer(
1326                         h->m_write, key, db_ctdb_parse_record_parser, state);
1327
1328                 if (found) {
1329                         return NT_STATUS_OK;
1330                 }
1331         }
1332
1333         if (ctx->db->persistent) {
1334                 /*
1335                  * Persistent db, but not found in the transaction buffer
1336                  */
1337                 return db_ctdb_ltdb_parse(
1338                         ctx, key, db_ctdb_parse_record_parser, state);
1339         }
1340
1341         state->done = false;
1342         state->ask_for_readonly_copy = false;
1343
1344         status = db_ctdb_ltdb_parse(
1345                 ctx, key, db_ctdb_parse_record_parser_nonpersistent, state);
1346         if (NT_STATUS_IS_OK(status) && state->done) {
1347                 if (state->empty_record) {
1348                         /*
1349                          * We know authoritatively, that this is an empty
1350                          * record. Since ctdb does not distinguish between empty
1351                          * and deleted records, this can be a record stored as
1352                          * empty or a not-yet-vacuumed tombstone record of a
1353                          * deleted record. Now Samba right now can live without
1354                          * empty records, so we can safely report this record
1355                          * as non-existing.
1356                          *
1357                          * See bugs 10008 and 12005.
1358                          */
1359                         return NT_STATUS_NOT_FOUND;
1360                 }
1361                 return NT_STATUS_OK;
1362         }
1363
1364         return NT_STATUS_MORE_PROCESSING_REQUIRED;
1365 }
1366
1367 static NTSTATUS db_ctdb_parse_record(struct db_context *db, TDB_DATA key,
1368                                      void (*parser)(TDB_DATA key,
1369                                                     TDB_DATA data,
1370                                                     void *private_data),
1371                                      void *private_data)
1372 {
1373         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1374                 db->private_data, struct db_ctdb_ctx);
1375         struct db_ctdb_parse_record_state state;
1376         NTSTATUS status;
1377         int ret;
1378
1379         state.parser = parser;
1380         state.private_data = private_data;
1381         state.my_vnn = ctdbd_vnn(ctx->conn);
1382         state.empty_record = false;
1383
1384         status = db_ctdb_try_parse_local_record(ctx, key, &state);
1385         if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
1386                 return status;
1387         }
1388
1389         ret = ctdbd_parse(ctx->conn, ctx->db_id, key,
1390                           state.ask_for_readonly_copy, parser, private_data);
1391         if (ret != 0) {
1392                 if (ret == ENOENT) {
1393                         /*
1394                          * This maps to
1395                          * NT_STATUS_OBJECT_NAME_NOT_FOUND. Our upper
1396                          * layers expect NT_STATUS_NOT_FOUND for "no
1397                          * record around". We need to convert dbwrap
1398                          * to 0/errno away from NTSTATUS ... :-)
1399                          */
1400                         return NT_STATUS_NOT_FOUND;
1401                 }
1402                 return map_nt_error_from_unix(ret);
1403         }
1404         return NT_STATUS_OK;
1405 }
1406
1407 static void db_ctdb_parse_record_done(struct tevent_req *subreq);
1408
1409 static struct tevent_req *db_ctdb_parse_record_send(
1410         TALLOC_CTX *mem_ctx,
1411         struct tevent_context *ev,
1412         struct db_context *db,
1413         TDB_DATA key,
1414         void (*parser)(TDB_DATA key,
1415                        TDB_DATA data,
1416                        void *private_data),
1417         void *private_data,
1418         enum dbwrap_req_state *req_state)
1419 {
1420         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1421                 db->private_data, struct db_ctdb_ctx);
1422         struct tevent_req *req = NULL;
1423         struct tevent_req *subreq = NULL;
1424         struct db_ctdb_parse_record_state *state = NULL;
1425         NTSTATUS status;
1426
1427         req = tevent_req_create(mem_ctx, &state,
1428                                 struct db_ctdb_parse_record_state);
1429         if (req == NULL) {
1430                 *req_state = DBWRAP_REQ_ERROR;
1431                 return NULL;
1432
1433         }
1434
1435         *state = (struct db_ctdb_parse_record_state) {
1436                 .parser = parser,
1437                 .private_data = private_data,
1438                 .my_vnn = ctdbd_vnn(ctx->conn),
1439                 .empty_record = false,
1440         };
1441
1442         status = db_ctdb_try_parse_local_record(ctx, key, state);
1443         if (!NT_STATUS_EQUAL(status, NT_STATUS_MORE_PROCESSING_REQUIRED)) {
1444                 if (tevent_req_nterror(req, status)) {
1445                         *req_state = DBWRAP_REQ_ERROR;
1446                         return tevent_req_post(req, ev);
1447                 }
1448                 *req_state = DBWRAP_REQ_DONE;
1449                 tevent_req_done(req);
1450                 return tevent_req_post(req, ev);
1451         }
1452
1453         subreq = ctdbd_parse_send(state,
1454                                   ev,
1455                                   ctdb_async_ctx.async_conn,
1456                                   ctx->db_id,
1457                                   key,
1458                                   state->ask_for_readonly_copy,
1459                                   parser,
1460                                   private_data,
1461                                   req_state);
1462         if (tevent_req_nomem(subreq, req)) {
1463                 *req_state = DBWRAP_REQ_ERROR;
1464                 return tevent_req_post(req, ev);
1465         }
1466         tevent_req_set_callback(subreq, db_ctdb_parse_record_done, req);
1467
1468         return req;
1469 }
1470
1471 static void db_ctdb_parse_record_done(struct tevent_req *subreq)
1472 {
1473         struct tevent_req *req = tevent_req_callback_data(
1474                 subreq, struct tevent_req);
1475         int ret;
1476
1477         ret = ctdbd_parse_recv(subreq);
1478         TALLOC_FREE(subreq);
1479         if (ret != 0) {
1480                 if (ret == ENOENT) {
1481                         /*
1482                          * This maps to NT_STATUS_OBJECT_NAME_NOT_FOUND. Our
1483                          * upper layers expect NT_STATUS_NOT_FOUND for "no
1484                          * record around". We need to convert dbwrap to 0/errno
1485                          * away from NTSTATUS ... :-)
1486                          */
1487                         tevent_req_nterror(req, NT_STATUS_NOT_FOUND);
1488                         return;
1489                 }
1490                 tevent_req_nterror(req, map_nt_error_from_unix(ret));
1491                 return;
1492         }
1493
1494         tevent_req_done(req);
1495         return;
1496 }
1497
1498 static NTSTATUS db_ctdb_parse_record_recv(struct tevent_req *req)
1499 {
1500         return tevent_req_simple_recv_ntstatus(req);
1501 }
1502
1503 struct traverse_state {
1504         struct db_context *db;
1505         int (*fn)(struct db_record *rec, void *private_data);
1506         void *private_data;
1507         int count;
1508 };
1509
1510 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1511 {
1512         struct traverse_state *state = (struct traverse_state *)private_data;
1513         struct db_record *rec;
1514         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1515         /* we have to give them a locked record to prevent races */
1516         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1517         if (rec && rec->value.dsize > 0) {
1518                 state->fn(rec, state->private_data);
1519         }
1520         talloc_free(tmp_ctx);
1521 }
1522
1523 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1524                                         void *private_data)
1525 {
1526         struct traverse_state *state = (struct traverse_state *)private_data;
1527         struct db_record *rec;
1528         TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1529         int ret = 0;
1530
1531         /*
1532          * Skip the __db_sequence_number__ key:
1533          * This is used for persistent transactions internally.
1534          */
1535         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1536             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1537         {
1538                 goto done;
1539         }
1540
1541         /* we have to give them a locked record to prevent races */
1542         rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1543         if (rec && rec->value.dsize > 0) {
1544                 ret = state->fn(rec, state->private_data);
1545         }
1546
1547 done:
1548         talloc_free(tmp_ctx);
1549         return ret;
1550 }
1551
1552 /* wrapper to use traverse_persistent_callback with dbwrap */
1553 static int traverse_persistent_callback_dbwrap(struct db_record *rec, void* data)
1554 {
1555         return traverse_persistent_callback(NULL, rec->key, rec->value, data);
1556 }
1557
1558 static int db_ctdbd_traverse(uint32_t db_id,
1559                              void (*fn)(TDB_DATA key, TDB_DATA data,
1560                                         void *private_data),
1561                              void *private_data)
1562 {
1563         struct ctdbd_connection *conn;
1564         int ret;
1565
1566         become_root();
1567         ret = ctdbd_init_connection(talloc_tos(), lp_ctdbd_socket(),
1568                                     lp_ctdb_timeout(), &conn);
1569         unbecome_root();
1570         if (ret != 0) {
1571                 DBG_WARNING("ctdbd_init_connection failed: %s\n",
1572                             strerror(ret));
1573                 return ret;
1574         }
1575
1576         ret = ctdbd_traverse(conn, db_id, fn, private_data);
1577         TALLOC_FREE(conn);
1578
1579         if (ret != 0) {
1580                 DBG_WARNING("ctdbd_traverse failed: %s\n",
1581                             strerror(ret));
1582                 return ret;
1583         }
1584
1585         return 0;
1586 }
1587
1588
1589 static int db_ctdb_traverse(struct db_context *db,
1590                             int (*fn)(struct db_record *rec,
1591                                       void *private_data),
1592                             void *private_data)
1593 {
1594         int ret;
1595         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1596                                                         struct db_ctdb_ctx);
1597         struct traverse_state state;
1598
1599         state.db = db;
1600         state.fn = fn;
1601         state.private_data = private_data;
1602         state.count = 0;
1603
1604         if (db->persistent) {
1605                 struct tdb_context *ltdb = ctx->wtdb->tdb;
1606
1607                 /* for persistent databases we don't need to do a ctdb traverse,
1608                    we can do a faster local traverse */
1609                 ret = tdb_traverse(ltdb, traverse_persistent_callback, &state);
1610                 if (ret < 0) {
1611                         return ret;
1612                 }
1613                 if (ctx->transaction && ctx->transaction->m_write) {
1614                         /*
1615                          * we now have to handle keys not yet
1616                          * present at transaction start
1617                          */
1618                         struct db_context *newkeys = db_open_rbt(talloc_tos());
1619                         struct ctdb_marshall_buffer *mbuf = ctx->transaction->m_write;
1620                         struct ctdb_rec_data_old *rec=NULL;
1621                         uint32_t i;
1622                         int count = 0;
1623                         NTSTATUS status;
1624
1625                         if (newkeys == NULL) {
1626                                 return -1;
1627                         }
1628
1629                         for (i=0; i<mbuf->count; i++) {
1630                                 TDB_DATA key;
1631                                 rec = db_ctdb_marshall_loop_next_key(
1632                                         mbuf, rec, &key);
1633                                 SMB_ASSERT(rec != NULL);
1634
1635                                 if (!tdb_exists(ltdb, key)) {
1636                                         dbwrap_store(newkeys, key, tdb_null, 0);
1637                                 }
1638                         }
1639                         status = dbwrap_traverse(newkeys,
1640                                                  traverse_persistent_callback_dbwrap,
1641                                                  &state,
1642                                                  &count);
1643                         talloc_free(newkeys);
1644                         if (!NT_STATUS_IS_OK(status)) {
1645                                 return -1;
1646                         }
1647                         ret += count;
1648                 }
1649                 return ret;
1650         }
1651
1652         ret = db_ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1653         if (ret != 0) {
1654                 return -1;
1655         }
1656         return state.count;
1657 }
1658
1659 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1660 {
1661         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1662 }
1663
1664 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1665 {
1666         return NT_STATUS_MEDIA_WRITE_PROTECTED;
1667 }
1668
1669 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1670 {
1671         struct traverse_state *state = (struct traverse_state *)private_data;
1672         struct db_record rec;
1673
1674         ZERO_STRUCT(rec);
1675         rec.db = state->db;
1676         rec.key = key;
1677         rec.value = data;
1678         rec.store = db_ctdb_store_deny;
1679         rec.delete_rec = db_ctdb_delete_deny;
1680         rec.private_data = NULL;
1681         state->fn(&rec, state->private_data);
1682         state->count++;
1683 }
1684
1685 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1686                                         void *private_data)
1687 {
1688         struct traverse_state *state = (struct traverse_state *)private_data;
1689         struct db_record rec;
1690
1691         /*
1692          * Skip the __db_sequence_number__ key:
1693          * This is used for persistent transactions internally.
1694          */
1695         if (kbuf.dsize == strlen(CTDB_DB_SEQNUM_KEY) + 1 &&
1696             strcmp((const char*)kbuf.dptr, CTDB_DB_SEQNUM_KEY) == 0)
1697         {
1698                 return 0;
1699         }
1700
1701         ZERO_STRUCT(rec);
1702         rec.db = state->db;
1703         rec.key = kbuf;
1704         rec.value = dbuf;
1705         rec.store = db_ctdb_store_deny;
1706         rec.delete_rec = db_ctdb_delete_deny;
1707         rec.private_data = NULL;
1708
1709         if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1710                 /* a deleted record */
1711                 return 0;
1712         }
1713         rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1714         rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1715
1716         state->count++;
1717         return state->fn(&rec, state->private_data);
1718 }
1719
1720 static int db_ctdb_traverse_read(struct db_context *db,
1721                                  int (*fn)(struct db_record *rec,
1722                                            void *private_data),
1723                                  void *private_data)
1724 {
1725         int ret;
1726         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1727                                                         struct db_ctdb_ctx);
1728         struct traverse_state state;
1729
1730         state.db = db;
1731         state.fn = fn;
1732         state.private_data = private_data;
1733         state.count = 0;
1734
1735         if (db->persistent) {
1736                 /* for persistent databases we don't need to do a ctdb traverse,
1737                    we can do a faster local traverse */
1738                 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1739         }
1740
1741         ret = db_ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1742         if (ret != 0) {
1743                 return -1;
1744         }
1745         return state.count;
1746 }
1747
1748 static int db_ctdb_get_seqnum(struct db_context *db)
1749 {
1750         struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1751                                                         struct db_ctdb_ctx);
1752         return tdb_get_seqnum(ctx->wtdb->tdb);
1753 }
1754
1755 static size_t db_ctdb_id(struct db_context *db, uint8_t *id, size_t idlen)
1756 {
1757         struct db_ctdb_ctx *ctx = talloc_get_type_abort(
1758                 db->private_data, struct db_ctdb_ctx);
1759
1760         if (idlen >= sizeof(ctx->db_id)) {
1761                 memcpy(id, &ctx->db_id, sizeof(ctx->db_id));
1762         }
1763
1764         return sizeof(ctx->db_id);
1765 }
1766
1767 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1768                                 struct messaging_context *msg_ctx,
1769                                 struct ctdbd_connection *conn,
1770                                 const char *name,
1771                                 int hash_size, int tdb_flags,
1772                                 int open_flags, mode_t mode,
1773                                 enum dbwrap_lock_order lock_order,
1774                                 uint64_t dbwrap_flags)
1775 {
1776         struct db_context *result;
1777         struct db_ctdb_ctx *db_ctdb;
1778         char *db_path;
1779         struct loadparm_context *lp_ctx;
1780         int32_t cstatus;
1781         int ret;
1782
1783         if (!lp_clustering()) {
1784                 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1785                 return NULL;
1786         }
1787
1788         if (!(result = talloc_zero(mem_ctx, struct db_context))) {
1789                 DEBUG(0, ("talloc failed\n"));
1790                 TALLOC_FREE(result);
1791                 return NULL;
1792         }
1793
1794         if (!(db_ctdb = talloc(result, struct db_ctdb_ctx))) {
1795                 DEBUG(0, ("talloc failed\n"));
1796                 TALLOC_FREE(result);
1797                 return NULL;
1798         }
1799
1800         result->name = talloc_strdup(result, name);
1801         if (result->name == NULL) {
1802                 DEBUG(0, ("talloc failed\n"));
1803                 TALLOC_FREE(result);
1804                 return NULL;
1805         }
1806
1807         db_ctdb->transaction = NULL;
1808         db_ctdb->db = result;
1809         db_ctdb->conn = conn;
1810
1811         ret = ctdbd_db_attach(db_ctdb->conn, name, &db_ctdb->db_id, tdb_flags);
1812         if (ret != 0) {
1813                 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name,
1814                           strerror(ret)));
1815                 TALLOC_FREE(result);
1816                 return NULL;
1817         }
1818
1819         db_path = ctdbd_dbpath(db_ctdb->conn, db_ctdb, db_ctdb->db_id);
1820
1821         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1822         result->lock_order = lock_order;
1823
1824         /* only pass through specific flags */
1825         tdb_flags &= TDB_SEQNUM|TDB_VOLATILE|
1826                 TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST;
1827
1828         if (!result->persistent) {
1829                 ret = ctdb_async_ctx_init(NULL, messaging_tevent_context(msg_ctx));
1830                 if (ret != 0) {
1831                         DBG_ERR("ctdb_async_ctx_init failed: %s\n", strerror(ret));
1832                         TALLOC_FREE(result);
1833                         return NULL;
1834                 }
1835         }
1836
1837         if (!result->persistent &&
1838             (dbwrap_flags & DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS))
1839         {
1840                 TDB_DATA indata;
1841
1842                 indata = make_tdb_data((uint8_t *)&db_ctdb->db_id,
1843                                        sizeof(db_ctdb->db_id));
1844
1845                 ret = ctdbd_control_local(
1846                         db_ctdb->conn, CTDB_CONTROL_SET_DB_READONLY, 0, 0,
1847                         indata, NULL, NULL, &cstatus);
1848                 if ((ret != 0) || (cstatus != 0)) {
1849                         DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
1850                                   "%s, %"PRIi32"\n", strerror(ret), cstatus));
1851                         TALLOC_FREE(result);
1852                         return NULL;
1853                 }
1854         }
1855
1856         lp_ctx = loadparm_init_s3(db_path, loadparm_s3_helpers());
1857
1858         if (hash_size == 0) {
1859                 hash_size = lpcfg_tdb_hash_size(lp_ctx, db_path);
1860         }
1861
1862         db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size,
1863                                       lpcfg_tdb_flags(lp_ctx, tdb_flags),
1864                                       O_RDWR, 0);
1865         talloc_unlink(db_path, lp_ctx);
1866         if (db_ctdb->wtdb == NULL) {
1867                 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1868                 TALLOC_FREE(result);
1869                 return NULL;
1870         }
1871         talloc_free(db_path);
1872
1873         /* honor permissions if user has specified O_CREAT */
1874         if (open_flags & O_CREAT) {
1875                 int fd;
1876                 fd = tdb_fd(db_ctdb->wtdb->tdb);
1877                 ret = fchmod(fd, mode);
1878                 if (ret == -1) {
1879                         DBG_WARNING("fchmod failed: %s\n",
1880                                     strerror(errno));
1881                         TALLOC_FREE(result);
1882                         return NULL;
1883                 }
1884         }
1885
1886         if (result->persistent) {
1887                 db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb, msg_ctx);
1888                 if (db_ctdb->lock_ctx == NULL) {
1889                         DEBUG(0, ("g_lock_ctx_init failed\n"));
1890                         TALLOC_FREE(result);
1891                         return NULL;
1892                 }
1893         }
1894
1895         db_ctdb->warn_unlock_msecs = lp_parm_int(-1, "ctdb",
1896                                                  "unlock_warn_threshold", 5);
1897         db_ctdb->warn_migrate_attempts = lp_parm_int(-1, "ctdb",
1898                                                      "migrate_attempts", 10);
1899         db_ctdb->warn_migrate_msecs = lp_parm_int(-1, "ctdb",
1900                                                   "migrate_duration", 5000);
1901         db_ctdb->warn_locktime_msecs = lp_ctdb_locktime_warn_threshold();
1902
1903         result->private_data = (void *)db_ctdb;
1904         result->fetch_locked = db_ctdb_fetch_locked;
1905         result->try_fetch_locked = db_ctdb_try_fetch_locked;
1906         result->parse_record = db_ctdb_parse_record;
1907         result->parse_record_send = db_ctdb_parse_record_send;
1908         result->parse_record_recv = db_ctdb_parse_record_recv;
1909         result->traverse = db_ctdb_traverse;
1910         result->traverse_read = db_ctdb_traverse_read;
1911         result->get_seqnum = db_ctdb_get_seqnum;
1912         result->transaction_start = db_ctdb_transaction_start;
1913         result->transaction_commit = db_ctdb_transaction_commit;
1914         result->transaction_cancel = db_ctdb_transaction_cancel;
1915         result->id = db_ctdb_id;
1916
1917         DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1918                  name, db_ctdb->db_id));
1919
1920         return result;
1921 }