netapi: implement NetGroupDel_r().
[kai/samba.git] / source / lib / dbwrap_tdb2.c
1 /*
2    Unix SMB/CIFS implementation.
3
4    Database interface wrapper around tdb/ctdb
5
6    Copyright (C) Volker Lendecke 2005-2007
7    Copyright (C) Stefan Metzmacher 2008
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "librpc/gen_ndr/ndr_messaging.h"
25
26 struct db_tdb2_ctx {
27         struct db_context *db;
28         const char *name;
29         struct tdb_wrap *mtdb;
30         const char *mtdb_path;
31         bool master_transaction;
32         struct {
33                 int hash_size;
34                 int tdb_flags;
35                 int open_flags;
36                 mode_t mode;
37         } open;
38         struct tdb_wrap *ltdb;
39         const char *ltdb_path;
40         bool local_transaction;
41         int transaction;
42         bool out_of_sync;
43         uint32_t lseqnum;
44         uint32_t mseqnum;
45 #define DB_TDB2_MASTER_SEQNUM_KEYSTR "DB_TDB2_MASTER_SEQNUM_KEYSTR"
46         TDB_DATA mseqkey;
47         uint32_t max_buffer_size;
48         uint32_t current_buffer_size;
49         struct dbwrap_tdb2_changes changes;
50 };
51
52
53 static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag);
54 static NTSTATUS db_tdb2_delete(struct db_record *rec);
55
56 static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key);
57 static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx);
58
59 static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx,
60                                           const char *name,
61                                           int hash_size, int tdb_flags,
62                                           int open_flags, mode_t mode,
63                                           const struct dbwrap_tdb2_changes *chgs);
64
65 static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx,
66                                     const struct dbwrap_tdb2_changes *changes);
67
68 static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction,
69                                const struct dbwrap_tdb2_changes *changes);
70 static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum);
71 static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx);
72 static int db_tdb2_transaction_cancel(struct db_context *db);
73
74 static void db_tdb2_receive_changes(struct messaging_context *msg,
75                                     void *private_data,
76                                     uint32_t msg_type,
77                                     struct server_id server_id,
78                                     DATA_BLOB *data);
79
80 static struct messaging_context *global_tdb2_msg_ctx;
81 static bool global_tdb2_msg_ctx_initialized;
82
83 void db_tdb2_setup_messaging(struct messaging_context *msg_ctx, bool server)
84 {
85         global_tdb2_msg_ctx = msg_ctx;
86
87         global_tdb2_msg_ctx_initialized = true;
88
89         if (!server) {
90                 return;
91         }
92
93         if (!lp_parm_bool(-1, "dbwrap", "use_tdb2", false)) {
94                 return;
95         }
96
97         messaging_register(msg_ctx, NULL, MSG_DBWRAP_TDB2_CHANGES,
98                            db_tdb2_receive_changes);
99 }
100
101 static struct messaging_context *db_tdb2_get_global_messaging_context(void)
102 {
103         struct messaging_context *msg_ctx;
104
105         if (global_tdb2_msg_ctx_initialized) {
106                 return global_tdb2_msg_ctx;
107         }
108
109         msg_ctx = messaging_init(NULL, procid_self(),
110                                  event_context_init(NULL));
111
112         db_tdb2_setup_messaging(msg_ctx, false);
113
114         return global_tdb2_msg_ctx;
115 }
116
117 struct tdb_fetch_locked_state {
118         TALLOC_CTX *mem_ctx;
119         struct db_record *result;
120 };
121
122 static int db_tdb2_fetchlock_parse(TDB_DATA key, TDB_DATA data,
123                                   void *private_data)
124 {
125         struct tdb_fetch_locked_state *state =
126                 (struct tdb_fetch_locked_state *)private_data;
127
128         state->result = (struct db_record *)talloc_size(
129                 state->mem_ctx,
130                 sizeof(struct db_record) + key.dsize + data.dsize);
131
132         if (state->result == NULL) {
133                 return 0;
134         }
135
136         state->result->key.dsize = key.dsize;
137         state->result->key.dptr = ((uint8 *)state->result)
138                 + sizeof(struct db_record);
139         memcpy(state->result->key.dptr, key.dptr, key.dsize);
140
141         state->result->value.dsize = data.dsize;
142
143         if (data.dsize > 0) {
144                 state->result->value.dptr = state->result->key.dptr+key.dsize;
145                 memcpy(state->result->value.dptr, data.dptr, data.dsize);
146         }
147         else {
148                 state->result->value.dptr = NULL;
149         }
150
151         return 0;
152 }
153
154 static struct db_record *db_tdb2_fetch_locked(struct db_context *db,
155                                               TALLOC_CTX *mem_ctx, TDB_DATA key)
156 {
157         struct db_tdb2_ctx *ctx = talloc_get_type_abort(db->private_data,
158                                                         struct db_tdb2_ctx);
159         struct tdb_fetch_locked_state state;
160
161         /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */
162         if(DEBUGLEVEL >= 10) {
163                 char *keystr = hex_encode(NULL, (unsigned char*)key.dptr, key.dsize);
164                 DEBUG(10, (DEBUGLEVEL > 10
165                            ? "Locking key %s\n" : "Locking key %.20s\n",
166                            keystr));
167                 TALLOC_FREE(keystr);
168         }
169
170         /*
171          * we only support modifications within a
172          * started transaction.
173          */
174         if (ctx->transaction == 0) {
175                 DEBUG(0, ("db_tdb2_fetch_locked[%s]: no transaction started\n",
176                           ctx->name));
177                 smb_panic("no transaction");
178                 return NULL;
179         }
180
181         state.mem_ctx = mem_ctx;
182         state.result = NULL;
183
184         tdb_parse_record(ctx->mtdb->tdb, key, db_tdb2_fetchlock_parse, &state);
185
186         if (state.result == NULL) {
187                 db_tdb2_fetchlock_parse(key, tdb_null, &state);
188         }
189
190         if (state.result == NULL) {
191                 return NULL;
192         }
193
194         state.result->private_data = talloc_reference(state.result, ctx);
195         state.result->store = db_tdb2_store;
196         state.result->delete_rec = db_tdb2_delete;
197
198         DEBUG(10, ("Allocated locked data 0x%p\n", state.result));
199
200         return state.result;
201 }
202
203 struct tdb_fetch_state {
204         TALLOC_CTX *mem_ctx;
205         int result;
206         TDB_DATA data;
207 };
208
209 static int db_tdb2_fetch_parse(TDB_DATA key, TDB_DATA data,
210                                void *private_data)
211 {
212         struct tdb_fetch_state *state =
213                 (struct tdb_fetch_state *)private_data;
214
215         state->data.dptr = (uint8 *)talloc_memdup(state->mem_ctx, data.dptr,
216                                                   data.dsize);
217         if (state->data.dptr == NULL) {
218                 state->result = -1;
219                 return 0;
220         }
221
222         state->data.dsize = data.dsize;
223         return 0;
224 }
225
226 static void db_tdb2_resync_before_read(struct db_tdb2_ctx *db_ctx, TDB_DATA *kbuf)
227 {
228         if (db_ctx->mtdb) {
229                 return;
230         }
231
232         if (!db_ctx->out_of_sync) {
233                 return;
234         }
235
236         /*
237          * this function operates on the local copy,
238          * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller.
239          */
240         if (kbuf && (db_ctx->mseqkey.dsize == kbuf->dsize) &&
241             (memcmp(db_ctx->mseqkey.dptr, kbuf->dptr, kbuf->dsize) == 0)) {
242                 return;
243         }
244
245         DEBUG(0,("resync_before_read[%s/%s]\n",
246                  db_ctx->mtdb_path, db_ctx->ltdb_path));
247
248         db_tdb2_open_master(db_ctx, false, NULL);
249         db_tdb2_close_master(db_ctx);
250 }
251
252 static int db_tdb2_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
253                          TDB_DATA key, TDB_DATA *pdata)
254 {
255         struct db_tdb2_ctx *ctx = talloc_get_type_abort(
256                 db->private_data, struct db_tdb2_ctx);
257
258         struct tdb_fetch_state state;
259
260         db_tdb2_resync_before_read(ctx, &key);
261
262         if (ctx->out_of_sync) {
263                 DEBUG(0,("out of sync[%s] failing fetch\n",
264                          ctx->ltdb_path));
265                 errno = EIO;
266                 return -1;
267         }
268
269         state.mem_ctx = mem_ctx;
270         state.result = 0;
271         state.data = tdb_null;
272
273         tdb_parse_record(ctx->ltdb->tdb, key, db_tdb2_fetch_parse, &state);
274
275         if (state.result == -1) {
276                 return -1;
277         }
278
279         *pdata = state.data;
280         return 0;
281 }
282
283 static NTSTATUS db_tdb2_store(struct db_record *rec, TDB_DATA data, int flag)
284 {
285         struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data,
286                                                        struct db_tdb2_ctx);
287         int ret;
288
289         /*
290          * This has a bug: We need to replace rec->value for correct
291          * operation, but right now brlock and locking don't use the value
292          * anymore after it was stored.
293          */
294
295         /* first store it to the master copy */
296         ret = tdb_store(ctx->mtdb->tdb, rec->key, data, flag);
297         if (ret != 0) {
298                 return NT_STATUS_UNSUCCESSFUL;
299         }
300
301         /* then store it to the local copy */
302         ret = tdb_store(ctx->ltdb->tdb, rec->key, data, flag);
303         if (ret != 0) {
304                 /* try to restore the old value in the master copy */
305                 if (rec->value.dptr) {
306                         tdb_store(ctx->mtdb->tdb, rec->key,
307                                   rec->value, TDB_REPLACE);
308                 } else {
309                         tdb_delete(ctx->mtdb->tdb, rec->key);
310                 }
311                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
312         }
313
314         db_tdb2_queue_change(ctx, rec->key);
315
316         return NT_STATUS_OK;
317 }
318
319 static NTSTATUS db_tdb2_delete(struct db_record *rec)
320 {
321         struct db_tdb2_ctx *ctx = talloc_get_type_abort(rec->private_data,
322                                                        struct db_tdb2_ctx);
323         int ret;
324
325         ret = tdb_delete(ctx->mtdb->tdb, rec->key);
326         if (ret != 0) {
327                 if (tdb_error(ctx->mtdb->tdb) == TDB_ERR_NOEXIST) {
328                         return NT_STATUS_NOT_FOUND;
329                 }
330
331                 return NT_STATUS_UNSUCCESSFUL;
332         }
333
334         ret = tdb_delete(ctx->ltdb->tdb, rec->key);
335         if (ret != 0) {
336                 /* try to restore the value in the master copy */
337                 tdb_store(ctx->mtdb->tdb, rec->key,
338                           rec->value, TDB_REPLACE);
339                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
340         }
341
342         db_tdb2_queue_change(ctx, rec->key);
343
344         return NT_STATUS_OK;
345 }
346
347 struct db_tdb2_traverse_ctx {
348         struct db_tdb2_ctx *db_ctx;
349         int (*f)(struct db_record *rec, void *private_data);
350         void *private_data;
351 };
352
353 static int db_tdb2_traverse_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
354                                 void *private_data)
355 {
356         struct db_tdb2_traverse_ctx *ctx =
357                 (struct db_tdb2_traverse_ctx *)private_data;
358         struct db_record rec;
359
360         /* this function operates on the master copy */
361
362         rec.key = kbuf;
363         rec.value = dbuf;
364         rec.store = db_tdb2_store;
365         rec.delete_rec = db_tdb2_delete;
366         rec.private_data = ctx->db_ctx;
367
368         return ctx->f(&rec, ctx->private_data);
369 }
370
371 static int db_tdb2_traverse(struct db_context *db,
372                            int (*f)(struct db_record *rec, void *private_data),
373                            void *private_data)
374 {
375         struct db_tdb2_ctx *db_ctx =
376                 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
377         struct db_tdb2_traverse_ctx ctx;
378
379         /*
380          * we only support modifications within a
381          * started transaction.
382          */
383         if (db_ctx->transaction == 0) {
384                 DEBUG(0, ("db_tdb2_traverse[%s]: no transaction started\n",
385                           db_ctx->name));
386                 smb_panic("no transaction");
387                 return -1;
388         }
389
390         /* here we traverse the master copy */
391         ctx.db_ctx = db_ctx;
392         ctx.f = f;
393         ctx.private_data = private_data;
394         return tdb_traverse(db_ctx->mtdb->tdb, db_tdb2_traverse_func, &ctx);
395 }
396
397 static NTSTATUS db_tdb2_store_deny(struct db_record *rec, TDB_DATA data, int flag)
398 {
399         return NT_STATUS_MEDIA_WRITE_PROTECTED;
400 }
401
402 static NTSTATUS db_tdb2_delete_deny(struct db_record *rec)
403 {
404         return NT_STATUS_MEDIA_WRITE_PROTECTED;
405 }
406
407 static int db_tdb2_traverse_read_func(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
408                                 void *private_data)
409 {
410         struct db_tdb2_traverse_ctx *ctx =
411                 (struct db_tdb2_traverse_ctx *)private_data;
412         struct db_record rec;
413
414         /*
415          * this function operates on the local copy,
416          * so hide the DB_TDB2_MASTER_SEQNUM_KEYSTR from the caller.
417          */
418         if ((ctx->db_ctx->mseqkey.dsize == kbuf.dsize) &&
419             (memcmp(ctx->db_ctx->mseqkey.dptr, kbuf.dptr, kbuf.dsize) == 0)) {
420                 return 0;
421         }
422
423         rec.key = kbuf;
424         rec.value = dbuf;
425         rec.store = db_tdb2_store_deny;
426         rec.delete_rec = db_tdb2_delete_deny;
427         rec.private_data = ctx->db_ctx;
428
429         return ctx->f(&rec, ctx->private_data);
430 }
431
432 static int db_tdb2_traverse_read(struct db_context *db,
433                            int (*f)(struct db_record *rec, void *private_data),
434                            void *private_data)
435 {
436         struct db_tdb2_ctx *db_ctx =
437                 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
438         struct db_tdb2_traverse_ctx ctx;
439         int ret;
440
441         db_tdb2_resync_before_read(db_ctx, NULL);
442
443         if (db_ctx->out_of_sync) {
444                 DEBUG(0,("out of sync[%s] failing traverse_read\n",
445                          db_ctx->ltdb_path));
446                 errno = EIO;
447                 return -1;
448         }
449
450         /* here we traverse the local copy */
451         ctx.db_ctx = db_ctx;
452         ctx.f = f;
453         ctx.private_data = private_data;
454         ret = tdb_traverse_read(db_ctx->ltdb->tdb, db_tdb2_traverse_read_func, &ctx);
455         if (ret > 0) {
456                 /* we have filtered one entry */
457                 ret--;
458         }
459
460         return ret;
461 }
462
463 static int db_tdb2_get_seqnum(struct db_context *db)
464
465 {
466         struct db_tdb2_ctx *db_ctx =
467                 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
468         uint32_t nlseq;
469         uint32_t nmseq;
470         bool ok;
471
472         nlseq = tdb_get_seqnum(db_ctx->ltdb->tdb);
473
474         if (nlseq == db_ctx->lseqnum) {
475                 return db_ctx->mseqnum;
476         }
477
478         ok = tdb_fetch_uint32_byblob(db_ctx->ltdb->tdb,
479                                      db_ctx->mseqkey,
480                                      &nmseq);
481         if (!ok) {
482                 /* TODO: what should we do here? */
483                 return db_ctx->mseqnum;
484         }
485
486         db_ctx->lseqnum = nlseq;
487         db_ctx->mseqnum = nmseq;
488
489         return db_ctx->mseqnum;
490 }
491
492 static int db_tdb2_transaction_start(struct db_context *db)
493 {
494         struct db_tdb2_ctx *db_ctx =
495                 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
496         int ret;
497
498         if (db_ctx->transaction) {
499                 db_ctx->transaction++;
500                 return 0;
501         }
502
503         /* we need to open the master tdb in order to */
504         ret = db_tdb2_open_master(db_ctx, true, NULL);
505         if (ret != 0) {
506                 return ret;
507         }
508
509         ret = tdb_transaction_start(db_ctx->ltdb->tdb);
510         if (ret != 0) {
511                 db_tdb2_close_master(db_ctx);
512                 return ret;
513         }
514
515         db_ctx->local_transaction = true;
516         db_ctx->transaction = 1;
517
518         return 0;
519 }
520
521 static void db_tdb2_queue_change(struct db_tdb2_ctx *db_ctx, const TDB_DATA key)
522 {
523         size_t size_needed = 4 + key.dsize;
524         size_t size_new = db_ctx->current_buffer_size + size_needed;
525         uint32_t i;
526         DATA_BLOB *keys;
527
528         db_ctx->changes.num_changes++;
529
530         if (db_ctx->changes.num_changes > 1 &&
531             db_ctx->changes.keys == NULL) {
532                 /*
533                  * this means we already overflowed
534                  */
535                 return;
536         }
537
538         if (db_ctx->changes.num_changes == 1) {
539                 db_ctx->changes.old_seqnum = db_ctx->mseqnum;
540         }
541
542         for (i=0; i < db_ctx->changes.num_keys; i++) {
543                 int ret;
544
545                 if (key.dsize != db_ctx->changes.keys[i].length) {
546                         continue;
547                 }
548                 ret = memcmp(key.dptr, db_ctx->changes.keys[i].data, key.dsize);
549                 if (ret != 0) {
550                         continue;
551                 }
552
553                 /*
554                  * the key is already in the list
555                  * so we're done
556                  */
557                 return;
558         }
559
560         if (db_ctx->max_buffer_size < size_new) {
561                 goto overflow;
562         }
563
564         keys = TALLOC_REALLOC_ARRAY(db_ctx, db_ctx->changes.keys,
565                                     DATA_BLOB,
566                                     db_ctx->changes.num_keys + 1);
567         if (!keys) {
568                 goto overflow;
569         }
570         db_ctx->changes.keys = keys;
571
572         keys[db_ctx->changes.num_keys].data = (uint8_t *)talloc_memdup(keys,
573                                                                 key.dptr,
574                                                                 key.dsize);
575         if (!keys[db_ctx->changes.num_keys].data) {
576                 goto overflow;
577         }
578         keys[db_ctx->changes.num_keys].length = key.dsize;
579         db_ctx->changes.num_keys++;
580         db_ctx->current_buffer_size = size_new;
581
582         return;
583
584 overflow:
585         /*
586          * on overflow discard the buffer and let
587          * the others reload the whole tdb
588          */
589         db_ctx->current_buffer_size = 0;
590         db_ctx->changes.num_keys = 0;
591         TALLOC_FREE(db_ctx->changes.keys);
592         return;
593 }
594
595 static void db_tdb2_send_notify(struct db_tdb2_ctx *db_ctx)
596 {
597         enum ndr_err_code ndr_err;
598         bool ok;
599         DATA_BLOB blob;
600         struct messaging_context *msg_ctx;
601         int num_msgs = 0;
602         struct server_id self = procid_self();
603
604         msg_ctx = db_tdb2_get_global_messaging_context();
605
606         db_ctx->changes.name = db_ctx->name;
607
608         DEBUG(10,("%s[%s] size[%u/%u] changes[%u] keys[%u] seqnum[%u=>%u]\n",
609                  __FUNCTION__,
610                  db_ctx->changes.name,
611                  db_ctx->current_buffer_size,
612                  db_ctx->max_buffer_size,
613                  db_ctx->changes.num_changes,
614                  db_ctx->changes.num_keys,
615                  db_ctx->changes.old_seqnum,
616                  db_ctx->changes.new_seqnum));
617
618         if (db_ctx->changes.num_changes == 0) {
619                 DEBUG(10,("db_tdb2_send_notify[%s]: no changes\n",
620                         db_ctx->changes.name));
621                 goto done;
622         }
623
624         if (!msg_ctx) {
625                 DEBUG(1,("db_tdb2_send_notify[%s]: skipped (no msg ctx)\n",
626                         db_ctx->changes.name));
627                 goto done;
628         }
629
630         ndr_err = ndr_push_struct_blob(
631                 &blob, talloc_tos(), &db_ctx->changes,
632                 (ndr_push_flags_fn_t)ndr_push_dbwrap_tdb2_changes);
633         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
634                 DEBUG(0,("db_tdb2_send_notify[%s]: failed to push changes: %s\n",
635                         db_ctx->changes.name,
636                         nt_errstr(ndr_map_error2ntstatus(ndr_err))));
637                 goto done;
638         }
639
640         ok = message_send_all(msg_ctx, MSG_DBWRAP_TDB2_CHANGES,
641                               blob.data, blob.length, &num_msgs);
642         if (!ok) {
643                 DEBUG(0,("db_tdb2_send_notify[%s]: failed to send changes\n",
644                         db_ctx->changes.name));
645                 goto done;
646         }
647
648         DEBUG(10,("db_tdb2_send_notify[%s]: pid %s send %u messages\n",
649                 db_ctx->name, procid_str_static(&self), num_msgs));
650
651 done:
652         TALLOC_FREE(db_ctx->changes.keys);
653         ZERO_STRUCT(db_ctx->changes);
654
655         return;
656 }
657
658 static void db_tdb2_receive_changes(struct messaging_context *msg,
659                                     void *private_data,
660                                     uint32_t msg_type,
661                                     struct server_id server_id,
662                                     DATA_BLOB *data)
663 {
664         enum ndr_err_code ndr_err;
665         struct dbwrap_tdb2_changes changes;
666         struct db_context *db;
667         struct server_id self;
668
669         if (procid_is_me(&server_id)) {
670                 DEBUG(0,("db_tdb2_receive_changes: ignore selfpacket\n"));
671                 return;
672         }
673
674         self = procid_self();
675
676         DEBUG(10,("db_tdb2_receive_changes: from %s to %s\n",
677                 procid_str(debug_ctx(), &server_id),
678                 procid_str(debug_ctx(), &self)));
679
680         ndr_err = ndr_pull_struct_blob_all(
681                 data, talloc_tos(), &changes,
682                 (ndr_pull_flags_fn_t)ndr_pull_dbwrap_tdb2_changes);
683         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
684                 DEBUG(0,("db_tdb2_receive_changes: failed to pull changes: %s\n",
685                         nt_errstr(ndr_map_error2ntstatus(ndr_err))));
686                 goto done;
687         }
688
689         if(DEBUGLEVEL >= 10) {
690                 NDR_PRINT_DEBUG(dbwrap_tdb2_changes, &changes);
691         }
692
693         /* open the db, this will sync it */
694         db = db_open_tdb2_ex(talloc_tos(), changes.name, 0,
695                              0, O_RDWR, 0600, &changes);
696         TALLOC_FREE(db);
697 done:
698         return;
699 }
700
701 static int db_tdb2_transaction_commit(struct db_context *db)
702 {
703         struct db_tdb2_ctx *db_ctx =
704                 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
705         int ret;
706         uint32_t mseqnum;
707
708         if (db_ctx->transaction == 0) {
709                 return -1;
710         } else if (db_ctx->transaction > 1) {
711                 db_ctx->transaction--;
712                 return 0;
713         }
714
715         mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
716         db_ctx->changes.new_seqnum = mseqnum;
717
718         /* first commit to the master copy */
719         ret = tdb_transaction_commit(db_ctx->mtdb->tdb);
720         db_ctx->master_transaction = false;
721         if (ret != 0) {
722                 int saved_errno = errno;
723                 db_tdb2_transaction_cancel(db);
724                 errno = saved_errno;
725                 return ret;
726         }
727
728         /*
729          * Note: as we've already commited the changes to the master copy
730          *       so we ignore errors in the following functions
731          */
732         ret = db_tdb2_commit_local(db_ctx, mseqnum);
733         if (ret == 0) {
734                 db_ctx->out_of_sync = false;
735         } else {
736                 db_ctx->out_of_sync = true;
737         }
738
739         db_ctx->transaction = 0;
740
741         db_tdb2_close_master(db_ctx);
742
743         db_tdb2_send_notify(db_ctx);
744
745         return 0;
746 }
747
748 static int db_tdb2_transaction_cancel(struct db_context *db)
749 {
750         struct db_tdb2_ctx *db_ctx =
751                 talloc_get_type_abort(db->private_data, struct db_tdb2_ctx);
752         int saved_errno;
753         int ret;
754
755         if (db_ctx->transaction == 0) {
756                 return -1;
757         }
758         if (db_ctx->transaction > 1) {
759                 db_ctx->transaction--;
760                 return 0;
761         }
762
763         /* cancel the transaction and close the master copy */
764         ret = db_tdb2_close_master(db_ctx);
765         saved_errno = errno;
766
767         /* now cancel on the local copy and ignore any error */
768         tdb_transaction_cancel(db_ctx->ltdb->tdb);
769         db_ctx->local_transaction = false;
770
771         db_ctx->transaction = 0;
772
773         errno = saved_errno;
774         return ret;
775 }
776
777 static int db_tdb2_open_master(struct db_tdb2_ctx *db_ctx, bool transaction,
778                                const struct dbwrap_tdb2_changes *changes)
779 {
780         int ret;
781
782         db_ctx->mtdb = tdb_wrap_open(db_ctx,
783                                      db_ctx->mtdb_path,
784                                      db_ctx->open.hash_size,
785                                      db_ctx->open.tdb_flags|TDB_NOMMAP|TDB_SEQNUM,
786                                      db_ctx->open.open_flags,
787                                      db_ctx->open.mode);
788         if (db_ctx->mtdb == NULL) {
789                 DEBUG(0, ("Could not open master tdb[%s]: %s\n",
790                           db_ctx->mtdb_path,
791                           strerror(errno)));
792                 return -1;
793         }
794         DEBUG(10,("open_master[%s]\n", db_ctx->mtdb_path));
795
796         if (!db_ctx->ltdb) {
797                 struct stat st;
798
799                 if (fstat(tdb_fd(db_ctx->mtdb->tdb), &st) == 0) {
800                         db_ctx->open.mode = st.st_mode;
801                 }
802
803                 /* make sure the local one uses the same hash size as the master one */
804                 db_ctx->open.hash_size = tdb_hash_size(db_ctx->mtdb->tdb);
805
806                 db_ctx->ltdb = tdb_wrap_open(db_ctx,
807                                              db_ctx->ltdb_path,
808                                              db_ctx->open.hash_size,
809                                              db_ctx->open.tdb_flags|TDB_SEQNUM,
810                                              db_ctx->open.open_flags|O_CREAT,
811                                              db_ctx->open.mode);
812                 if (db_ctx->ltdb == NULL) {
813                         DEBUG(0, ("Could not open local tdb[%s]: %s\n",
814                                   db_ctx->ltdb_path,
815                                   strerror(errno)));
816                         TALLOC_FREE(db_ctx->mtdb);
817                         return -1;
818                 }
819                 DEBUG(10,("open_local[%s]\n", db_ctx->ltdb_path));
820         }
821
822         if (transaction) {
823                 ret = tdb_transaction_start(db_ctx->mtdb->tdb);
824                 if (ret != 0) {
825                         DEBUG(0,("open failed to start transaction[%s]\n",
826                                  db_ctx->mtdb_path));
827                         db_tdb2_close_master(db_ctx);
828                         return ret;
829                 }
830                 db_ctx->master_transaction = true;
831         }
832
833         ret = db_tdb2_sync_from_master(db_ctx, changes);
834         if (ret != 0) {
835                 DEBUG(0,("open failed to sync from master[%s]\n",
836                          db_ctx->ltdb_path));
837                 db_tdb2_close_master(db_ctx);
838                 return ret;
839         }
840
841         return 0;
842 }
843
844 static int db_tdb2_commit_local(struct db_tdb2_ctx *db_ctx, uint32_t mseqnum)
845 {
846         bool ok;
847         int ret;
848
849         /* first fetch the master seqnum */
850         db_ctx->mseqnum = mseqnum;
851
852         /* now we try to store the master seqnum in the local tdb */
853         ok = tdb_store_uint32_byblob(db_ctx->ltdb->tdb,
854                                      db_ctx->mseqkey,
855                                      db_ctx->mseqnum);
856         if (!ok) {
857                 tdb_transaction_cancel(db_ctx->ltdb->tdb);
858                 db_ctx->local_transaction = false;
859                 DEBUG(0,("local failed[%s] store mseq[%u]\n",
860                          db_ctx->ltdb_path, db_ctx->mseqnum));
861                 return -1;
862         }
863
864         /* now commit all changes to the local tdb */
865         ret = tdb_transaction_commit(db_ctx->ltdb->tdb);
866         db_ctx->local_transaction = false;
867         if (ret != 0) {
868                 DEBUG(0,("local failed[%s] commit mseq[%u]\n",
869                          db_ctx->ltdb_path, db_ctx->mseqnum));
870                 return ret;
871         }
872
873         /*
874          * and update the cached local seqnum this is needed to
875          * let us cache the master seqnum.
876          */
877         db_ctx->lseqnum = tdb_get_seqnum(db_ctx->ltdb->tdb);
878         DEBUG(10,("local updated[%s] mseq[%u]\n",
879                   db_ctx->ltdb_path, db_ctx->mseqnum));
880
881         return 0;
882 }
883
884 static int db_tdb2_close_master(struct db_tdb2_ctx *db_ctx)
885 {
886         if (db_ctx->master_transaction) {
887                 tdb_transaction_cancel(db_ctx->mtdb->tdb);
888         }
889         db_ctx->master_transaction = false;
890         /* now we can close the master handle */
891         TALLOC_FREE(db_ctx->mtdb);
892
893         DEBUG(10,("close_master[%s] ok\n", db_ctx->mtdb_path));
894         return 0;
895 }
896
897 static int db_tdb2_traverse_sync_all_func(TDB_CONTEXT *tdb,
898                                           TDB_DATA kbuf, TDB_DATA dbuf,
899                                           void *private_data)
900 {
901         struct db_tdb2_traverse_ctx *ctx =
902                 (struct db_tdb2_traverse_ctx *)private_data;
903         uint32_t *seqnum = (uint32_t *)ctx->private_data;
904         int ret;
905
906         DEBUG(10,("sync_entry[%s]\n", ctx->db_ctx->mtdb_path));
907
908         /* Do not accidently allocate/deallocate w/o need when debug level is lower than needed */
909         if(DEBUGLEVEL >= 10) {
910                 char *keystr = hex_encode(NULL, (unsigned char*)kbuf.dptr, kbuf.dsize);
911                 DEBUG(10, (DEBUGLEVEL > 10
912                            ? "Locking key %s\n" : "Locking key %.20s\n",
913                            keystr));
914                 TALLOC_FREE(keystr);
915         }
916
917         ret = tdb_store(ctx->db_ctx->ltdb->tdb, kbuf, dbuf, TDB_INSERT);
918         if (ret != 0) {
919                 DEBUG(0,("sync_entry[%s] %d: %s\n",
920                         ctx->db_ctx->ltdb_path, ret,
921                         tdb_errorstr(ctx->db_ctx->ltdb->tdb)));
922                 return ret;
923         }
924
925         *seqnum = tdb_get_seqnum(ctx->db_ctx->mtdb->tdb);
926
927         return 0;
928 }
929
930 static int db_tdb2_sync_all(struct db_tdb2_ctx *db_ctx, uint32_t *seqnum)
931 {
932         struct db_tdb2_traverse_ctx ctx;
933         int ret;
934
935         ret = tdb_wipe_all(db_ctx->ltdb->tdb);
936         if (ret != 0) {
937                 DEBUG(0,("tdb_wipe_all[%s] failed %d: %s\n",
938                          db_ctx->ltdb_path, ret,
939                          tdb_errorstr(db_ctx->ltdb->tdb)));
940                 return ret;
941         }
942
943         ctx.db_ctx = db_ctx;
944         ctx.f = NULL;
945         ctx.private_data = seqnum;
946         ret = tdb_traverse_read(db_ctx->mtdb->tdb,
947                                 db_tdb2_traverse_sync_all_func,
948                                 &ctx);
949         DEBUG(10,("db_tdb2_sync_all[%s] count[%d]\n",
950                   db_ctx->mtdb_path, ret));
951         if (ret < 0) {
952                 return ret;
953         }
954
955         return 0;
956 }
957
958 static int db_tdb2_sync_changes(struct db_tdb2_ctx *db_ctx,
959                                 const struct dbwrap_tdb2_changes *changes,
960                                 uint32_t *seqnum)
961 {
962         uint32_t cseqnum;
963         uint32_t mseqnum;
964         uint32_t i;
965         int ret;
966         bool need_full_sync = false;
967
968         DEBUG(10,("db_tdb2_sync_changes[%s] changes[%u]\n",
969                   changes->name, changes->num_changes));
970         if(DEBUGLEVEL >= 10) {
971                 NDR_PRINT_DEBUG(dbwrap_tdb2_changes, discard_const(changes));
972         }
973
974         /* for the master tdb for reading */
975         ret = tdb_lockall_read(db_ctx->mtdb->tdb);
976         if (ret != 0) {
977                 DEBUG(0,("tdb_lockall_read[%s] %d\n", db_ctx->mtdb_path, ret));
978                 return ret;
979         }
980
981         /* first fetch seqnum we know about */
982         cseqnum = db_tdb2_get_seqnum(db_ctx->db);
983
984         /* then fetch the master seqnum */
985         mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
986
987         if (cseqnum == mseqnum) {
988                 DEBUG(10,("db_tdb2_sync_changes[%s] uptodate[%u]\n",
989                           db_ctx->mtdb_path, mseqnum));
990                 /* we hit a race before and now noticed we're uptodate */
991                 goto done;
992         }
993
994         /* now see if the changes describe what we need */
995         if (changes->old_seqnum != cseqnum) {
996                 need_full_sync = true;
997         }
998
999         if (changes->new_seqnum != mseqnum) {
1000                 need_full_sync = true;
1001         }
1002
1003         /* this was the overflow case */
1004         if (changes->num_keys == 0) {
1005                 need_full_sync = true;
1006         }
1007
1008         if (need_full_sync) {
1009                 tdb_unlockall_read(db_ctx->mtdb->tdb);
1010                 DEBUG(0,("fallback to full sync[%s] seq[%u=>%u] keys[%u]\n",
1011                         db_ctx->ltdb_path, cseqnum, mseqnum,
1012                         changes->num_keys));
1013                 return db_tdb2_sync_all(db_ctx, &mseqnum);
1014         }
1015
1016         for (i=0; i < changes->num_keys; i++) {
1017                 const char *op = NULL;
1018                 bool del = false;
1019                 TDB_DATA key;
1020                 TDB_DATA val;
1021
1022                 key.dsize = changes->keys[i].length;
1023                 key.dptr = changes->keys[i].data;
1024
1025                 val = tdb_fetch(db_ctx->mtdb->tdb, key);
1026                 ret = tdb_error(db_ctx->mtdb->tdb);
1027                 if (ret == TDB_ERR_NOEXIST) {
1028                         del = true;
1029                 } else if (ret != 0) {
1030                         DEBUG(0,("sync_changes[%s] failure %d\n",
1031                                  db_ctx->mtdb_path, ret));
1032                         goto failed;
1033                 }
1034
1035                 if (del) {
1036                         op = "delete";
1037                         ret = tdb_delete(db_ctx->ltdb->tdb, key);
1038                         DEBUG(10,("sync_changes[%s] delete key[%u] %d\n",
1039                                   db_ctx->mtdb_path, i, ret));
1040                 } else {
1041                         op = "store";
1042                         ret = tdb_store(db_ctx->ltdb->tdb, key,
1043                                         val, TDB_REPLACE);
1044                         DEBUG(10,("sync_changes[%s] store key[%u] %d\n",
1045                                   db_ctx->mtdb_path, i, ret));
1046                 }
1047                 SAFE_FREE(val.dptr);
1048                 if (ret != 0) {
1049                         DEBUG(0,("sync_changes[%s] %s key[%u] failed %d\n",
1050                                  db_ctx->mtdb_path, op, i, ret));
1051                         goto failed;
1052                 }
1053         }
1054
1055 done:
1056         tdb_unlockall_read(db_ctx->mtdb->tdb);
1057
1058         *seqnum = mseqnum;
1059         return 0;
1060 failed:
1061         tdb_unlockall_read(db_ctx->mtdb->tdb);
1062         return ret;
1063 }
1064
1065 static int db_tdb2_sync_from_master(struct db_tdb2_ctx *db_ctx,
1066                                     const struct dbwrap_tdb2_changes *changes)
1067 {
1068         int ret;
1069         uint32_t cseqnum;
1070         uint32_t mseqnum;
1071         bool force = false;
1072
1073         /* first fetch seqnum we know about */
1074         cseqnum = db_tdb2_get_seqnum(db_ctx->db);
1075
1076         /* then fetch the master seqnum */
1077         mseqnum = tdb_get_seqnum(db_ctx->mtdb->tdb);
1078
1079         if (db_ctx->lseqnum == 0) {
1080                 force = true;
1081         }
1082
1083         if (!force && cseqnum == mseqnum) {
1084                 DEBUG(10,("uptodate[%s] mseq[%u]\n",
1085                           db_ctx->ltdb_path, mseqnum));
1086                 /* the local copy is uptodate, close the master db */
1087                 return 0;
1088         }
1089         DEBUG(10,("not uptodate[%s] seq[%u=>%u]\n",
1090                   db_ctx->ltdb_path, cseqnum, mseqnum));
1091
1092         ret = tdb_transaction_start(db_ctx->ltdb->tdb);
1093         if (ret != 0) {
1094                 DEBUG(0,("failed to start transaction[%s] %d: %s\n",
1095                          db_ctx->ltdb_path, ret,
1096                          tdb_errorstr(db_ctx->ltdb->tdb)));
1097                 db_ctx->out_of_sync = true;
1098                 return ret;
1099         }
1100         db_ctx->local_transaction = true;
1101
1102         if (changes && !force) {
1103                 ret = db_tdb2_sync_changes(db_ctx, changes, &mseqnum);
1104                 if (ret != 0) {
1105                         db_ctx->out_of_sync = true;
1106                         tdb_transaction_cancel(db_ctx->ltdb->tdb);
1107                         db_ctx->local_transaction = false;
1108                         return ret;
1109                 }
1110         } else {
1111                 ret = db_tdb2_sync_all(db_ctx, &mseqnum);
1112                 if (ret != 0) {
1113                         db_ctx->out_of_sync = true;
1114                         tdb_transaction_cancel(db_ctx->ltdb->tdb);
1115                         db_ctx->local_transaction = false;
1116                         return ret;
1117                 }
1118         }
1119
1120         ret = db_tdb2_commit_local(db_ctx, mseqnum);
1121         if (ret != 0) {
1122                 db_ctx->out_of_sync = true;
1123                 return ret;
1124         }
1125
1126         db_ctx->out_of_sync = false;
1127
1128         return 0;
1129 }
1130
1131 static int db_tdb2_ctx_destructor(struct db_tdb2_ctx *db_tdb2)
1132 {
1133         db_tdb2_close_master(db_tdb2);
1134         if (db_tdb2->local_transaction) {
1135                 tdb_transaction_cancel(db_tdb2->ltdb->tdb);
1136         }
1137         db_tdb2->local_transaction = false;
1138         TALLOC_FREE(db_tdb2->ltdb);
1139         return 0;
1140 }
1141
1142 static struct db_context *db_open_tdb2_ex(TALLOC_CTX *mem_ctx,
1143                                           const char *name,
1144                                           int hash_size, int tdb_flags,
1145                                           int open_flags, mode_t mode,
1146                                           const struct dbwrap_tdb2_changes *chgs)
1147 {
1148         struct db_context *result = NULL;
1149         struct db_tdb2_ctx *db_tdb2;
1150         int ret;
1151         const char *md;
1152         const char *ld;
1153         const char *bn;
1154
1155         bn = strrchr_m(name, '/');
1156         if (bn) {
1157                 bn++;
1158                 DEBUG(3,("db_open_tdb2: use basename[%s] of abspath[%s]:\n",
1159                         bn, name));
1160         } else {
1161                 bn = name;
1162         }
1163
1164         md = lp_parm_const_string(-1, "dbwrap_tdb2", "master directory", NULL);
1165         if (!md) {
1166                 DEBUG(0,("'dbwrap_tdb2:master directory' empty\n"));
1167                 goto fail;
1168         }
1169
1170         ld = lp_parm_const_string(-1, "dbwrap_tdb2", "local directory", NULL);
1171         if (!ld) {
1172                 DEBUG(0,("'dbwrap_tdb2:local directory' empty\n"));
1173                 goto fail;
1174         }
1175
1176         result = TALLOC_ZERO_P(mem_ctx, struct db_context);
1177         if (result == NULL) {
1178                 DEBUG(0, ("talloc failed\n"));
1179                 goto fail;
1180         }
1181
1182         result->private_data = db_tdb2 = TALLOC_ZERO_P(result, struct db_tdb2_ctx);
1183         if (db_tdb2 == NULL) {
1184                 DEBUG(0, ("talloc failed\n"));
1185                 goto fail;
1186         }
1187
1188         db_tdb2->db = result;
1189
1190         db_tdb2->open.hash_size = hash_size;
1191         db_tdb2->open.tdb_flags = tdb_flags;
1192         db_tdb2->open.open_flags= open_flags;
1193         db_tdb2->open.mode      = mode;
1194
1195         db_tdb2->max_buffer_size = lp_parm_ulong(-1, "dbwrap_tdb2",
1196                                                  "notify buffer size", 512);
1197
1198         db_tdb2->name = talloc_strdup(db_tdb2, bn);
1199         if (db_tdb2->name == NULL) {
1200                 DEBUG(0, ("talloc_strdup failed\n"));
1201                 goto fail;
1202         }
1203
1204         db_tdb2->mtdb_path = talloc_asprintf(db_tdb2, "%s/%s",
1205                                              md, bn);
1206         if (db_tdb2->mtdb_path == NULL) {
1207                 DEBUG(0, ("talloc_asprintf failed\n"));
1208                 goto fail;
1209         }
1210
1211         db_tdb2->ltdb_path = talloc_asprintf(db_tdb2, "%s/%s.tdb2",
1212                                              ld, bn);
1213         if (db_tdb2->ltdb_path == NULL) {
1214                 DEBUG(0, ("talloc_asprintf failed\n"));
1215                 goto fail;
1216         }
1217
1218         db_tdb2->mseqkey = string_term_tdb_data(DB_TDB2_MASTER_SEQNUM_KEYSTR);
1219
1220         /*
1221          * this implicit opens the local one if as it's not yet open
1222          * it syncs the local copy.
1223          */
1224         ret = db_tdb2_open_master(db_tdb2, false, chgs);
1225         if (ret != 0) {
1226                 goto fail;
1227         }
1228
1229         ret = db_tdb2_close_master(db_tdb2);
1230         if (ret != 0) {
1231                 goto fail;
1232         }
1233
1234         DEBUG(10,("db_open_tdb2[%s] opened with mseq[%u]\n",
1235                   db_tdb2->name, db_tdb2->mseqnum));
1236
1237         result->fetch_locked = db_tdb2_fetch_locked;
1238         result->fetch = db_tdb2_fetch;
1239         result->traverse = db_tdb2_traverse;
1240         result->traverse_read = db_tdb2_traverse_read;
1241         result->get_seqnum = db_tdb2_get_seqnum;
1242         result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1243         result->transaction_start = db_tdb2_transaction_start;
1244         result->transaction_commit = db_tdb2_transaction_commit;
1245         result->transaction_cancel = db_tdb2_transaction_cancel;
1246
1247         talloc_set_destructor(db_tdb2, db_tdb2_ctx_destructor);
1248
1249         return result;
1250
1251  fail:
1252         if (result != NULL) {
1253                 TALLOC_FREE(result);
1254         }
1255         return NULL;
1256 }
1257
1258 struct db_context *db_open_tdb2(TALLOC_CTX *mem_ctx,
1259                                 const char *name,
1260                                 int hash_size, int tdb_flags,
1261                                 int open_flags, mode_t mode)
1262 {
1263         return db_open_tdb2_ex(mem_ctx, name, hash_size,
1264                                tdb_flags, open_flags, mode, NULL);
1265 }