ldb_mdb: Wrap mdb_env_open
authorGary Lockyer <gary@catalyst.net.nz>
Tue, 6 Mar 2018 23:05:34 +0000 (12:05 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 23 May 2018 00:27:11 +0000 (02:27 +0200)
Wrap mdb_env_open to ensure that we only have one MDB_env opened per
database in each process

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
lib/ldb/ldb_mdb/ldb_mdb.c

index fa81d9bd61e9c843239e3b73cf5b213cc291028f..b9192fe0b46f7db5a60beb14568412a0d95935a4 100644 (file)
@@ -649,47 +649,81 @@ static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
                trans_finished(lmdb, ltx);
                ltx = lmdb_private_trans_head(lmdb);
        }
-
-       mdb_env_close(lmdb->env);
        lmdb->env = NULL;
 
        return 0;
 }
 
-static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
+struct mdb_env_wrap {
+       struct mdb_env_wrap *next, *prev;
+       dev_t device;
+       ino_t inode;
+       MDB_env *env;
+       int pid;
+};
+
+static struct mdb_env_wrap *mdb_list;
+
+/* destroy the last connection to an mdb */
+static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
+{
+       mdb_env_close(w->env);
+       DLIST_REMOVE(mdb_list, w);
+       return 0;
+}
+
+static int lmdb_open_env(TALLOC_CTX *mem_ctx,
+                        MDB_env **env,
                         struct ldb_context *ldb,
                         const char *path,
-                        unsigned int flags,
-                        struct lmdb_private *lmdb)
+                        unsigned int flags)
 {
        int ret;
-       unsigned int mdb_flags;
        const size_t mmap_size = 8LL * GIGABYTE;
-       int lmdb_max_key_length;
-
-       if (flags & LDB_FLG_DONT_CREATE_DB) {
-               struct stat st;
+       unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
+       /*
+        * MDB_NOSUBDIR implies there is a separate file called path and a
+        * separate lockfile called path-lock
+        */
 
-               if (stat(path, &st) != 0) {
-                       return LDB_ERR_UNAVAILABLE;
+       struct mdb_env_wrap *w;
+       struct stat st;
+
+       if (stat(path, &st) == 0) {
+               for (w=mdb_list;w;w=w->next) {
+                       if (st.st_dev == w->device && st.st_ino == w->inode) {
+                               /*
+                                * We must have only one MDB_env per process
+                                */
+                               if (!talloc_reference(mem_ctx, w)) {
+                                       return ldb_oom(ldb);
+                               }
+                               *env = w->env;
+                               return LDB_SUCCESS;
+                       }
                }
        }
 
-       ret = mdb_env_create(&lmdb->env);
+       w = talloc(mem_ctx, struct mdb_env_wrap);
+       if (w == NULL) {
+               return ldb_oom(ldb);
+       }
+
+       ret = mdb_env_create(env);
        if (ret != 0) {
                ldb_asprintf_errstring(
                        ldb,
                        "Could not create MDB environment %s: %s\n",
                        path,
                        mdb_strerror(ret));
-               return LDB_ERR_OPERATIONS_ERROR;
+               return ldb_mdb_err_map(ret);
        }
 
-       /* Close when lmdb is released */
-       talloc_set_destructor(lmdb, lmdb_pvt_destructor);
-
-       ret = mdb_env_set_mapsize(lmdb->env,
-                                 mmap_size);
+       /*
+        * Currently we set a 8Gb maximum database size
+        * via the constant mmap_size above
+        */
+       ret = mdb_env_set_mapsize(*env, mmap_size);
        if (ret != 0) {
                ldb_asprintf_errstring(
                        ldb,
@@ -697,29 +731,71 @@ static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
                        (unsigned long long)(mmap_size),
                        path,
                        mdb_strerror(ret));
+               TALLOC_FREE(w);
                return ldb_mdb_err_map(ret);
        }
 
-       mdb_env_set_maxreaders(lmdb->env, 100000);
-       /* MDB_NOSUBDIR implies there is a separate file called path and a
-        * separate lockfile called path-lock
+       mdb_env_set_maxreaders(*env, 100000);
+       /*
+        * As we ensure that there is only one MDB_env open per database per
+        * process. We can not use the MDB_RDONLY flag, as another ldb may be
+        * opened in read write mode
         */
-       mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
-       if (flags & LDB_FLG_RDONLY) {
-               mdb_flags |= MDB_RDONLY;
-       }
        if (flags & LDB_FLG_NOSYNC) {
                mdb_flags |= MDB_NOSYNC;
        }
-       ret = mdb_env_open(lmdb->env, path, mdb_flags, 0644);
+       ret = mdb_env_open(*env, path, mdb_flags, 0644);
        if (ret != 0) {
                ldb_asprintf_errstring(ldb,
                                "Could not open DB %s: %s\n",
                                path, mdb_strerror(ret));
-               talloc_free(lmdb);
+               TALLOC_FREE(w);
                return ldb_mdb_err_map(ret);
        }
 
+       if (stat(path, &st) != 0) {
+               ldb_asprintf_errstring(
+                       ldb,
+                       "Could not stat %s:\n",
+                       path);
+               TALLOC_FREE(w);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       w->env = *env;
+       w->device = st.st_dev;
+       w->inode  = st.st_ino;
+
+       talloc_set_destructor(w, mdb_env_wrap_destructor);
+
+       DLIST_ADD(mdb_list, w);
+
+       return LDB_SUCCESS;
+
+}
+
+static int lmdb_pvt_open(struct lmdb_private *lmdb,
+                        struct ldb_context *ldb,
+                        const char *path,
+                        unsigned int flags)
+{
+       int ret;
+       int lmdb_max_key_length;
+
+       if (flags & LDB_FLG_DONT_CREATE_DB) {
+               struct stat st;
+               if (stat(path, &st) != 0) {
+                       return LDB_ERR_UNAVAILABLE;
+               }
+       }
+
+       ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
+       if (ret != 0) {
+               return ret;
+       }
+
+       /* Close when lmdb is released */
+       talloc_set_destructor(lmdb, lmdb_pvt_destructor);
+
        /* Store the original pid during the LMDB open */
        lmdb->pid = getpid();
 
@@ -727,7 +803,6 @@ static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
 
        /* This will never happen, but if it does make sure to freak out */
        if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
-               talloc_free(lmdb);
                return ldb_operr(ldb);
        }
 
@@ -763,17 +838,17 @@ int lmdb_connect(struct ldb_context *ldb,
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       lmdb = talloc_zero(ldb, struct lmdb_private);
+       lmdb = talloc_zero(ltdb, struct lmdb_private);
        if (lmdb == NULL) {
                TALLOC_FREE(ltdb);
-               ldb_oom(ldb);
-               return LDB_ERR_OPERATIONS_ERROR;
+               return ldb_oom(ldb);
        }
        lmdb->ldb = ldb;
        ltdb->kv_ops = &lmdb_key_value_ops;
 
-       ret = lmdb_pvt_open(ldb, ldb, path, flags, lmdb);
+       ret = lmdb_pvt_open(lmdb, ldb, path, flags);
        if (ret != LDB_SUCCESS) {
+               TALLOC_FREE(ltdb);
                return ret;
        }