selftest: force 'client use krb5 netlogon = yes' for admem_idmap_autorid
[samba.git] / lib / ldb / ldb_mdb / ldb_mdb.c
1 /*
2    ldb database library using mdb back end
3
4    Copyright (C) Jakub Hrozek 2014
5    Copyright (C) Catalyst.Net Ltd 2017
6
7      ** NOTE! The following LGPL license applies to the ldb
8      ** library. This does NOT imply that all of Samba is released
9      ** under the LGPL
10
11    This library is free software; you can redistribute it and/or
12    modify it under the terms of the GNU Lesser General Public
13    License as published by the Free Software Foundation; either
14    version 3 of the License, or (at your option) any later version.
15
16    This library is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19    Lesser General Public License for more details.
20
21    You should have received a copy of the GNU Lesser General Public
22    License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 */
24
25 #include "ldb_mdb.h"
26 #include "../ldb_key_value/ldb_kv.h"
27 #include "include/dlinklist.h"
28
29 #define MDB_URL_PREFIX          "mdb://"
30 #define MDB_URL_PREFIX_SIZE     (sizeof(MDB_URL_PREFIX)-1)
31
32 #define LDB_MDB_MAX_KEY_LENGTH 511
33
34 #define GIGABYTE (1024*1024*1024)
35
36 int ldb_mdb_err_map(int lmdb_err)
37 {
38         switch (lmdb_err) {
39         case MDB_SUCCESS:
40                 return LDB_SUCCESS;
41         case EIO:
42                 return LDB_ERR_OPERATIONS_ERROR;
43 #ifdef EBADE
44         case EBADE:
45 #endif
46         case MDB_INCOMPATIBLE:
47         case MDB_CORRUPTED:
48         case MDB_INVALID:
49                 return LDB_ERR_UNAVAILABLE;
50         case MDB_BAD_TXN:
51         case MDB_BAD_VALSIZE:
52 #ifdef MDB_BAD_DBI
53         case MDB_BAD_DBI:
54 #endif
55         case MDB_PANIC:
56         case EINVAL:
57                 return LDB_ERR_PROTOCOL_ERROR;
58         case MDB_MAP_FULL:
59         case MDB_DBS_FULL:
60         case MDB_READERS_FULL:
61         case MDB_TLS_FULL:
62         case MDB_TXN_FULL:
63         case EAGAIN:
64                 return LDB_ERR_BUSY;
65         case MDB_KEYEXIST:
66                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
67         case MDB_NOTFOUND:
68         case ENOENT:
69                 return LDB_ERR_NO_SUCH_OBJECT;
70         case EACCES:
71                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
72         default:
73                 break;
74         }
75         return LDB_ERR_OTHER;
76 }
77
78 #define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
79 static int lmdb_error_at(struct ldb_context *ldb,
80                          int ecode,
81                          const char *file,
82                          int line)
83 {
84         int ldb_err = ldb_mdb_err_map(ecode);
85         char *reason = mdb_strerror(ecode);
86         ldb_asprintf_errstring(ldb,
87                                "(%d) - %s at %s:%d",
88                                ecode,
89                                reason,
90                                file,
91                                line);
92         return ldb_err;
93 }
94
95 static bool lmdb_transaction_active(struct ldb_kv_private *ldb_kv)
96 {
97         return ldb_kv->lmdb_private->txlist != NULL;
98 }
99
100 static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
101 {
102         if (ltx == NULL) {
103                 return NULL;
104         }
105
106         return ltx->tx;
107 }
108
109 static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
110 {
111         if (lmdb->txlist) {
112                 talloc_steal(lmdb->txlist, ltx);
113         }
114
115         DLIST_ADD(lmdb->txlist, ltx);
116 }
117
118 static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
119 {
120         DLIST_REMOVE(lmdb->txlist, ltx);
121         talloc_free(ltx);
122 }
123
124
125 static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
126 {
127         struct lmdb_trans *ltx;
128
129         ltx = lmdb->txlist;
130         return ltx;
131 }
132
133
134 static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
135 {
136         MDB_txn *txn = NULL;
137
138         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
139         if (txn != NULL) {
140                 return txn;
141         }
142         if (lmdb->read_txn != NULL) {
143                 return lmdb->read_txn;
144         }
145         lmdb->error = MDB_BAD_TXN;
146         ldb_set_errstring(lmdb->ldb, __location__":No active transaction\n");
147         return NULL;
148 }
149
150 static int lmdb_store(struct ldb_kv_private *ldb_kv,
151                       struct ldb_val key,
152                       struct ldb_val data,
153                       int flags)
154 {
155         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
156         MDB_val mdb_key;
157         MDB_val mdb_data;
158         int mdb_flags;
159         MDB_txn *txn = NULL;
160         MDB_dbi dbi = 0;
161
162         if (ldb_kv->read_only) {
163                 return LDB_ERR_UNWILLING_TO_PERFORM;
164         }
165
166         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
167         if (txn == NULL) {
168                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
169                 lmdb->error = MDB_PANIC;
170                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
171         }
172
173         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
174         if (lmdb->error != MDB_SUCCESS) {
175                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
176         }
177
178         mdb_key.mv_size = key.length;
179         mdb_key.mv_data = key.data;
180
181         mdb_data.mv_size = data.length;
182         mdb_data.mv_data = data.data;
183
184         if (flags == TDB_INSERT) {
185                 mdb_flags = MDB_NOOVERWRITE;
186         } else if ((flags == TDB_MODIFY)) {
187                 /*
188                  * Modifying a record, ensure that it exists.
189                  * This mimics the TDB semantics
190                  */
191                 MDB_val value;
192                 lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
193                 if (lmdb->error != MDB_SUCCESS) {
194                         return ldb_mdb_error(lmdb->ldb, lmdb->error);
195                 }
196                 mdb_flags = 0;
197         } else {
198                 mdb_flags = 0;
199         }
200
201         lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
202         if (lmdb->error != MDB_SUCCESS) {
203                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
204         }
205
206         return ldb_mdb_err_map(lmdb->error);
207 }
208
209 static int lmdb_delete(struct ldb_kv_private *ldb_kv, struct ldb_val key)
210 {
211         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
212         MDB_val mdb_key;
213         MDB_txn *txn = NULL;
214         MDB_dbi dbi = 0;
215
216         if (ldb_kv->read_only) {
217                 return LDB_ERR_UNWILLING_TO_PERFORM;
218         }
219
220         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
221         if (txn == NULL) {
222                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
223                 lmdb->error = MDB_PANIC;
224                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
225         }
226
227         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
228         if (lmdb->error != MDB_SUCCESS) {
229                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
230         }
231
232         mdb_key.mv_size = key.length;
233         mdb_key.mv_data = key.data;
234
235         lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
236         if (lmdb->error != MDB_SUCCESS) {
237                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
238         }
239         return ldb_mdb_err_map(lmdb->error);
240 }
241
242 static int lmdb_traverse_fn(struct ldb_kv_private *ldb_kv,
243                             ldb_kv_traverse_fn fn,
244                             void *ctx)
245 {
246         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
247         MDB_val mdb_key;
248         MDB_val mdb_data;
249         MDB_txn *txn = NULL;
250         MDB_dbi dbi = 0;
251         MDB_cursor *cursor = NULL;
252         int ret;
253
254         txn = get_current_txn(lmdb);
255         if (txn == NULL) {
256                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
257                 lmdb->error = MDB_PANIC;
258                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
259         }
260
261         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
262         if (lmdb->error != MDB_SUCCESS) {
263                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
264         }
265
266         lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
267         if (lmdb->error != MDB_SUCCESS) {
268                 goto done;
269         }
270
271         while ((lmdb->error = mdb_cursor_get(
272                         cursor, &mdb_key,
273                         &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
274
275                 struct ldb_val key = {
276                         .length = mdb_key.mv_size,
277                         .data = mdb_key.mv_data,
278                 };
279                 struct ldb_val data = {
280                         .length = mdb_data.mv_size,
281                         .data = mdb_data.mv_data,
282                 };
283
284                 ret = fn(ldb_kv, key, data, ctx);
285                 if (ret != 0) {
286                         goto done;
287                 }
288         }
289         if (lmdb->error == MDB_NOTFOUND) {
290                 lmdb->error = MDB_SUCCESS;
291         }
292 done:
293         if (cursor != NULL) {
294                 mdb_cursor_close(cursor);
295         }
296
297         if (lmdb->error != MDB_SUCCESS) {
298                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
299         }
300         return ldb_mdb_err_map(lmdb->error);
301 }
302
303 static int lmdb_update_in_iterate(struct ldb_kv_private *ldb_kv,
304                                   struct ldb_val key,
305                                   struct ldb_val key2,
306                                   struct ldb_val data,
307                                   void *state)
308 {
309         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
310         struct ldb_val copy;
311         int ret = LDB_SUCCESS;
312
313         /*
314          * Need to take a copy of the data as the delete operation alters the
315          * data, as it is in private lmdb memory.
316          */
317         copy.length = data.length;
318         copy.data = talloc_memdup(ldb_kv, data.data, data.length);
319         if (copy.data == NULL) {
320                 lmdb->error = MDB_PANIC;
321                 return ldb_oom(lmdb->ldb);
322         }
323
324         lmdb->error = lmdb_delete(ldb_kv, key);
325         if (lmdb->error != MDB_SUCCESS) {
326                 ldb_debug(
327                         lmdb->ldb,
328                         LDB_DEBUG_ERROR,
329                         "Failed to delete %*.*s "
330                         "for rekey as %*.*s: %s",
331                         (int)key.length, (int)key.length,
332                         (const char *)key.data,
333                         (int)key2.length, (int)key2.length,
334                         (const char *)key.data,
335                         mdb_strerror(lmdb->error));
336                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
337                 goto done;
338         }
339
340         lmdb->error = lmdb_store(ldb_kv, key2, copy, 0);
341         if (lmdb->error != MDB_SUCCESS) {
342                 ldb_debug(
343                         lmdb->ldb,
344                         LDB_DEBUG_ERROR,
345                         "Failed to rekey %*.*s as %*.*s: %s",
346                         (int)key.length, (int)key.length,
347                         (const char *)key.data,
348                         (int)key2.length, (int)key2.length,
349                         (const char *)key.data,
350                         mdb_strerror(lmdb->error));
351                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
352                 goto done;
353         }
354
355 done:
356         if (copy.data != NULL) {
357                 TALLOC_FREE(copy.data);
358                 copy.length = 0;
359         }
360
361         /*
362          * Explicity invalidate the data, as the delete has done this
363          */
364         data.length = 0;
365         data.data = NULL;
366
367         return ret;
368 }
369
370 /* Handles only a single record */
371 static int lmdb_parse_record(struct ldb_kv_private *ldb_kv,
372                              struct ldb_val key,
373                              int (*parser)(struct ldb_val key,
374                                            struct ldb_val data,
375                                            void *private_data),
376                              void *ctx)
377 {
378         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
379         MDB_val mdb_key;
380         MDB_val mdb_data;
381         MDB_txn *txn = NULL;
382         MDB_dbi dbi;
383         struct ldb_val data;
384
385         txn = get_current_txn(lmdb);
386         if (txn == NULL) {
387                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
388                 lmdb->error = MDB_PANIC;
389                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
390         }
391
392         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
393         if (lmdb->error != MDB_SUCCESS) {
394                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
395         }
396
397         mdb_key.mv_size = key.length;
398         mdb_key.mv_data = key.data;
399
400         lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
401         if (lmdb->error != MDB_SUCCESS) {
402                 /* TODO closing a handle should not even be necessary */
403                 mdb_dbi_close(lmdb->env, dbi);
404                 if (lmdb->error == MDB_NOTFOUND) {
405                         return LDB_ERR_NO_SUCH_OBJECT;
406                 }
407                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
408         }
409         data.data = mdb_data.mv_data;
410         data.length = mdb_data.mv_size;
411
412         /* TODO closing a handle should not even be necessary */
413         mdb_dbi_close(lmdb->env, dbi);
414
415         return parser(key, data, ctx);
416 }
417
418
419 static int lmdb_lock_read(struct ldb_module *module)
420 {
421         void *data = ldb_module_get_private(module);
422         struct ldb_kv_private *ldb_kv =
423             talloc_get_type(data, struct ldb_kv_private);
424         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
425         pid_t pid = getpid();
426
427         if (pid != lmdb->pid) {
428                 ldb_asprintf_errstring(
429                         lmdb->ldb,
430                         __location__": Reusing ldb opened by pid %d in "
431                         "process %d\n",
432                         lmdb->pid,
433                         pid);
434                 lmdb->error = MDB_BAD_TXN;
435                 return LDB_ERR_PROTOCOL_ERROR;
436         }
437
438         lmdb->error = MDB_SUCCESS;
439         if (lmdb_transaction_active(ldb_kv) == false &&
440             ldb_kv->read_lock_count == 0) {
441                 lmdb->error = mdb_txn_begin(lmdb->env,
442                                             NULL,
443                                             MDB_RDONLY,
444                                             &lmdb->read_txn);
445         }
446         if (lmdb->error != MDB_SUCCESS) {
447                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
448         }
449
450         ldb_kv->read_lock_count++;
451         return ldb_mdb_err_map(lmdb->error);
452 }
453
454 static int lmdb_unlock_read(struct ldb_module *module)
455 {
456         void *data = ldb_module_get_private(module);
457         struct ldb_kv_private *ldb_kv =
458             talloc_get_type(data, struct ldb_kv_private);
459
460         if (lmdb_transaction_active(ldb_kv) == false &&
461             ldb_kv->read_lock_count == 1) {
462                 struct lmdb_private *lmdb = ldb_kv->lmdb_private;
463                 mdb_txn_commit(lmdb->read_txn);
464                 lmdb->read_txn = NULL;
465                 ldb_kv->read_lock_count--;
466                 return LDB_SUCCESS;
467         }
468         ldb_kv->read_lock_count--;
469         return LDB_SUCCESS;
470 }
471
472 static int lmdb_transaction_start(struct ldb_kv_private *ldb_kv)
473 {
474         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
475         struct lmdb_trans *ltx;
476         struct lmdb_trans *ltx_head;
477         MDB_txn *tx_parent;
478         pid_t pid = getpid();
479
480         /* Do not take out the transaction lock on a read-only DB */
481         if (ldb_kv->read_only) {
482                 return LDB_ERR_UNWILLING_TO_PERFORM;
483         }
484
485         ltx = talloc_zero(lmdb, struct lmdb_trans);
486         if (ltx == NULL) {
487                 return ldb_oom(lmdb->ldb);
488         }
489
490         if (pid != lmdb->pid) {
491                 ldb_asprintf_errstring(
492                         lmdb->ldb,
493                         __location__": Reusing ldb opened by pid %d in "
494                         "process %d\n",
495                         lmdb->pid,
496                         pid);
497                 lmdb->error = MDB_BAD_TXN;
498                 return LDB_ERR_PROTOCOL_ERROR;
499         }
500
501         ltx_head = lmdb_private_trans_head(lmdb);
502
503         tx_parent = lmdb_trans_get_tx(ltx_head);
504
505         lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, &ltx->tx);
506         if (lmdb->error != MDB_SUCCESS) {
507                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
508         }
509
510         trans_push(lmdb, ltx);
511
512         return ldb_mdb_err_map(lmdb->error);
513 }
514
515 static int lmdb_transaction_cancel(struct ldb_kv_private *ldb_kv)
516 {
517         struct lmdb_trans *ltx;
518         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
519
520         ltx = lmdb_private_trans_head(lmdb);
521         if (ltx == NULL) {
522                 return LDB_ERR_OPERATIONS_ERROR;
523         }
524
525         mdb_txn_abort(ltx->tx);
526         trans_finished(lmdb, ltx);
527         return LDB_SUCCESS;
528 }
529
530 static int lmdb_transaction_prepare_commit(struct ldb_kv_private *ldb_kv)
531 {
532         /* No need to prepare a commit */
533         return LDB_SUCCESS;
534 }
535
536 static int lmdb_transaction_commit(struct ldb_kv_private *ldb_kv)
537 {
538         struct lmdb_trans *ltx;
539         struct lmdb_private *lmdb = ldb_kv->lmdb_private;
540
541         ltx = lmdb_private_trans_head(lmdb);
542         if (ltx == NULL) {
543                 return LDB_ERR_OPERATIONS_ERROR;
544         }
545
546         lmdb->error = mdb_txn_commit(ltx->tx);
547         trans_finished(lmdb, ltx);
548
549         return lmdb->error;
550 }
551
552 static int lmdb_error(struct ldb_kv_private *ldb_kv)
553 {
554         return ldb_mdb_err_map(ldb_kv->lmdb_private->error);
555 }
556
557 static const char *lmdb_errorstr(struct ldb_kv_private *ldb_kv)
558 {
559         return mdb_strerror(ldb_kv->lmdb_private->error);
560 }
561
562 static const char *lmdb_name(struct ldb_kv_private *ldb_kv)
563 {
564         return "lmdb";
565 }
566
567 static bool lmdb_changed(struct ldb_kv_private *ldb_kv)
568 {
569         /*
570          * lmdb does no provide a quick way to determine if the database
571          * has changed.  This function always returns true.
572          *
573          * Note that tdb uses a sequence number that allows this function
574          * to be implemented efficiently.
575          */
576         return true;
577 }
578
579 static struct kv_db_ops lmdb_key_value_ops = {
580         .store              = lmdb_store,
581         .delete             = lmdb_delete,
582         .iterate            = lmdb_traverse_fn,
583         .update_in_iterate  = lmdb_update_in_iterate,
584         .fetch_and_parse    = lmdb_parse_record,
585         .lock_read          = lmdb_lock_read,
586         .unlock_read        = lmdb_unlock_read,
587         .begin_write        = lmdb_transaction_start,
588         .prepare_write      = lmdb_transaction_prepare_commit,
589         .finish_write       = lmdb_transaction_commit,
590         .abort_write        = lmdb_transaction_cancel,
591         .error              = lmdb_error,
592         .errorstr           = lmdb_errorstr,
593         .name               = lmdb_name,
594         .has_changed        = lmdb_changed,
595         .transaction_active = lmdb_transaction_active,
596 };
597
598 static const char *lmdb_get_path(const char *url)
599 {
600         const char *path;
601
602         /* parse the url */
603         if (strchr(url, ':')) {
604                 if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
605                         return NULL;
606                 }
607                 path = url + MDB_URL_PREFIX_SIZE;
608         } else {
609                 path = url;
610         }
611
612         return path;
613 }
614
615 static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
616 {
617         struct lmdb_trans *ltx = NULL;
618
619         /* Check if this is a forked child */
620         if (getpid() != lmdb->pid) {
621                 int fd = 0;
622                 /*
623                  * We cannot call mdb_env_close or commit any transactions,
624                  * otherwise they might appear finished in the parent.
625                  *
626                  */
627
628                 if (mdb_env_get_fd(lmdb->env, &fd) == 0) {
629                         close(fd);
630                 }
631
632                 /* Remove the pointer, so that no access should occur */
633                 lmdb->env = NULL;
634
635                 return 0;
636         }
637
638         /*
639          * Close the read transaction if it's open
640          */
641         if (lmdb->read_txn != NULL) {
642                 mdb_txn_abort(lmdb->read_txn);
643         }
644
645         if (lmdb->env == NULL) {
646                 return 0;
647         }
648
649         /*
650          * Abort any currently active transactions
651          */
652         ltx = lmdb_private_trans_head(lmdb);
653         while (ltx != NULL) {
654                 mdb_txn_abort(ltx->tx);
655                 trans_finished(lmdb, ltx);
656                 ltx = lmdb_private_trans_head(lmdb);
657         }
658         lmdb->env = NULL;
659
660         return 0;
661 }
662
663 struct mdb_env_wrap {
664         struct mdb_env_wrap *next, *prev;
665         dev_t device;
666         ino_t inode;
667         MDB_env *env;
668         pid_t pid;
669 };
670
671 static struct mdb_env_wrap *mdb_list;
672
673 /* destroy the last connection to an mdb */
674 static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
675 {
676         mdb_env_close(w->env);
677         DLIST_REMOVE(mdb_list, w);
678         return 0;
679 }
680
681 static int lmdb_open_env(TALLOC_CTX *mem_ctx,
682                          MDB_env **env,
683                          struct ldb_context *ldb,
684                          const char *path,
685                          unsigned int flags)
686 {
687         int ret;
688         const size_t mmap_size = 8LL * GIGABYTE;
689         unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
690         /*
691          * MDB_NOSUBDIR implies there is a separate file called path and a
692          * separate lockfile called path-lock
693          */
694
695         struct mdb_env_wrap *w;
696         struct stat st;
697         pid_t pid = getpid();
698         int fd = 0;
699         unsigned v;
700
701         if (stat(path, &st) == 0) {
702                 for (w=mdb_list;w;w=w->next) {
703                         if (st.st_dev == w->device &&
704                             st.st_ino == w->inode &&
705                             pid == w->pid) {
706                                 /*
707                                  * We must have only one MDB_env per process
708                                  */
709                                 if (!talloc_reference(mem_ctx, w)) {
710                                         return ldb_oom(ldb);
711                                 }
712                                 *env = w->env;
713                                 return LDB_SUCCESS;
714                         }
715                 }
716         }
717
718         w = talloc(mem_ctx, struct mdb_env_wrap);
719         if (w == NULL) {
720                 return ldb_oom(ldb);
721         }
722
723         ret = mdb_env_create(env);
724         if (ret != 0) {
725                 ldb_asprintf_errstring(
726                         ldb,
727                         "Could not create MDB environment %s: %s\n",
728                         path,
729                         mdb_strerror(ret));
730                 return ldb_mdb_err_map(ret);
731         }
732
733         /*
734          * Currently we set a 8Gb maximum database size
735          * via the constant mmap_size above
736          */
737         ret = mdb_env_set_mapsize(*env, mmap_size);
738         if (ret != 0) {
739                 ldb_asprintf_errstring(
740                         ldb,
741                         "Could not set MDB mmap() size to %llu on %s: %s\n",
742                         (unsigned long long)(mmap_size),
743                         path,
744                         mdb_strerror(ret));
745                 TALLOC_FREE(w);
746                 return ldb_mdb_err_map(ret);
747         }
748
749         mdb_env_set_maxreaders(*env, 100000);
750         /*
751          * As we ensure that there is only one MDB_env open per database per
752          * process. We can not use the MDB_RDONLY flag, as another ldb may be
753          * opened in read write mode
754          */
755         if (flags & LDB_FLG_NOSYNC) {
756                 mdb_flags |= MDB_NOSYNC;
757         }
758         ret = mdb_env_open(*env, path, mdb_flags, 0644);
759         if (ret != 0) {
760                 ldb_asprintf_errstring(ldb,
761                                 "Could not open DB %s: %s\n",
762                                 path, mdb_strerror(ret));
763                 TALLOC_FREE(w);
764                 return ldb_mdb_err_map(ret);
765         }
766
767         ret = mdb_env_get_fd(*env, &fd);
768         if (ret != 0) {
769                 ldb_asprintf_errstring(ldb,
770                                        "Could not obtain DB FD %s: %s\n",
771                                        path, mdb_strerror(ret));
772                 TALLOC_FREE(w);
773                 return ldb_mdb_err_map(ret);
774         }
775
776         /* Just as for TDB: on exec, don't inherit the fd */
777         v = fcntl(fd, F_GETFD, 0);
778         fcntl(fd, F_SETFD, v | FD_CLOEXEC);
779
780         if (fstat(fd, &st) != 0) {
781                 ldb_asprintf_errstring(
782                         ldb,
783                         "Could not stat %s:\n",
784                         path);
785                 TALLOC_FREE(w);
786                 return LDB_ERR_OPERATIONS_ERROR;
787         }
788         w->env = *env;
789         w->device = st.st_dev;
790         w->inode  = st.st_ino;
791         w->pid = pid;
792
793         talloc_set_destructor(w, mdb_env_wrap_destructor);
794
795         DLIST_ADD(mdb_list, w);
796
797         return LDB_SUCCESS;
798
799 }
800
801 static int lmdb_pvt_open(struct lmdb_private *lmdb,
802                          struct ldb_context *ldb,
803                          const char *path,
804                          unsigned int flags)
805 {
806         int ret;
807         int lmdb_max_key_length;
808
809         if (flags & LDB_FLG_DONT_CREATE_DB) {
810                 struct stat st;
811                 if (stat(path, &st) != 0) {
812                         return LDB_ERR_UNAVAILABLE;
813                 }
814         }
815
816         ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
817         if (ret != 0) {
818                 return ret;
819         }
820
821         /* Close when lmdb is released */
822         talloc_set_destructor(lmdb, lmdb_pvt_destructor);
823
824         /* Store the original pid during the LMDB open */
825         lmdb->pid = getpid();
826
827         lmdb_max_key_length = mdb_env_get_maxkeysize(lmdb->env);
828
829         /* This will never happen, but if it does make sure to freak out */
830         if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
831                 return ldb_operr(ldb);
832         }
833
834         return LDB_SUCCESS;
835 }
836
837 int lmdb_connect(struct ldb_context *ldb,
838                  const char *url,
839                  unsigned int flags,
840                  const char *options[],
841                  struct ldb_module **_module)
842 {
843         const char *path = NULL;
844         struct lmdb_private *lmdb = NULL;
845         struct ldb_kv_private *ldb_kv = NULL;
846         int ret;
847
848         /*
849          * We hold locks, so we must use a private event context
850          * on each returned handle
851          */
852         ldb_set_require_private_event_context(ldb);
853
854         path = lmdb_get_path(url);
855         if (path == NULL) {
856                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
857                 return LDB_ERR_OPERATIONS_ERROR;
858         }
859
860         ldb_kv = talloc_zero(ldb, struct ldb_kv_private);
861         if (!ldb_kv) {
862                 ldb_oom(ldb);
863                 return LDB_ERR_OPERATIONS_ERROR;
864         }
865
866         lmdb = talloc_zero(ldb_kv, struct lmdb_private);
867         if (lmdb == NULL) {
868                 TALLOC_FREE(ldb_kv);
869                 return ldb_oom(ldb);
870         }
871         lmdb->ldb = ldb;
872         ldb_kv->kv_ops = &lmdb_key_value_ops;
873
874         ret = lmdb_pvt_open(lmdb, ldb, path, flags);
875         if (ret != LDB_SUCCESS) {
876                 TALLOC_FREE(ldb_kv);
877                 return ret;
878         }
879
880         ldb_kv->lmdb_private = lmdb;
881         if (flags & LDB_FLG_RDONLY) {
882                 ldb_kv->read_only = true;
883         }
884
885         /*
886          * This maximum length becomes encoded in the index values so
887          * must never change even if LMDB starts to allow longer keys.
888          * The override option is max_key_len_for_self_test, and is
889          * used for testing only.
890          */
891         ldb_kv->max_key_length = LDB_MDB_MAX_KEY_LENGTH;
892
893         return ldb_kv_init_store(
894             ldb_kv, "ldb_mdb backend", ldb, options, _module);
895 }