smbd: Store share_entries in locking.tdb again
authorVolker Lendecke <vl@samba.org>
Tue, 14 Apr 2020 14:51:15 +0000 (16:51 +0200)
committerJeremy Allison <jra@samba.org>
Fri, 15 May 2020 02:27:49 +0000 (02:27 +0000)
The "base on g_lock" patch is a slowdown because we do more tdb
operations. Getting share_entries.tdb back into locking.tdb tries to
speed things up again.

Now that we're based on g_lock we'll most likely use a different
method in order to spread the entries across multiple records.

It still maintains the sorted array of share modes within locking.tdb,
but not as part of the expensive ndr marshalling of the complete
array.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
Autobuild-User(master): Jeremy Allison <jra@samba.org>
Autobuild-Date(master): Fri May 15 02:27:49 UTC 2020 on sn-devel-184

source3/locking/share_mode_lock.c

index 262402b79956ca4b5c837801c3f0c4b56f2a63fa..adfa917df31760889836b4e16dfb9964e9d2cb90 100644 (file)
@@ -61,7 +61,6 @@
 
 /* the locking database handle */
 static struct g_lock_ctx *lock_ctx;
-static struct db_context *share_entries_db;
 
 static bool locking_init_internal(bool read_only)
 {
@@ -103,30 +102,7 @@ static bool locking_init_internal(bool read_only)
        }
        g_lock_set_lock_order(lock_ctx, DBWRAP_LOCK_ORDER_1);
 
-       db_path = lock_path(talloc_tos(), "share_entries.tdb");
-       if (db_path == NULL) {
-               TALLOC_FREE(lock_ctx);
-               return false;
-       }
-
-       share_entries_db = db_open(
-               NULL, db_path,
-               SMB_OPEN_DATABASE_TDB_HASH_SIZE,
-               TDB_DEFAULT|
-               TDB_VOLATILE|
-               TDB_CLEAR_IF_FIRST|
-               TDB_INCOMPATIBLE_HASH,
-               read_only?O_RDONLY:O_RDWR|O_CREAT, 0644,
-               DBWRAP_LOCK_ORDER_3, DBWRAP_FLAG_NONE);
-       TALLOC_FREE(db_path);
-
-       if (share_entries_db == NULL) {
-               TALLOC_FREE(lock_ctx);
-               return false;
-       }
-
        if (!posix_locking_init(read_only)) {
-               TALLOC_FREE(share_entries_db);
                TALLOC_FREE(lock_ctx);
                return False;
        }
@@ -432,6 +408,182 @@ static bool share_mode_entry_get(
        return true;
 }
 
+/*
+ * locking.tdb records consist of
+ *
+ * uint32_t share_mode_data_len
+ * uint8_t [share_mode_data]       This is struct share_mode_data in NDR
+ *
+ * 0 [SHARE_MODE_ENTRY_SIZE]       Sorted array of share modes,
+ * 1 [SHARE_MODE_ENTRY_SIZE]       filling up the rest of the data in the
+ * 2 [SHARE_MODE_ENTRY_SIZE]       g_lock.c maintained record in locking.tdb
+ */
+
+struct locking_tdb_data {
+       const uint8_t *share_mode_data_buf;
+       size_t share_mode_data_len;
+       const uint8_t *share_entries;
+       size_t num_share_entries;
+};
+
+static bool locking_tdb_data_get(
+       struct locking_tdb_data *data, const uint8_t *buf, size_t buflen)
+{
+       uint32_t share_mode_data_len, share_entries_len;
+
+       if (buflen == 0) {
+               *data = (struct locking_tdb_data) { 0 };
+               return true;
+       }
+       if (buflen < sizeof(uint32_t)) {
+               return false;
+       }
+
+       share_mode_data_len = PULL_LE_U32(buf, 0);
+
+       buf += sizeof(uint32_t);
+       buflen -= sizeof(uint32_t);
+
+       if (buflen < share_mode_data_len) {
+               return false;
+       }
+
+       share_entries_len = buflen - share_mode_data_len;
+
+       if ((share_entries_len % SHARE_MODE_ENTRY_SIZE) != 0) {
+               return false;
+       }
+
+       *data = (struct locking_tdb_data) {
+               .share_mode_data_buf = buf,
+               .share_mode_data_len = share_mode_data_len,
+               .share_entries = buf + share_mode_data_len,
+               .num_share_entries = share_entries_len / SHARE_MODE_ENTRY_SIZE,
+       };
+
+       return true;
+}
+
+struct locking_tdb_data_fetch_state {
+       TALLOC_CTX *mem_ctx;
+       uint8_t *data;
+       size_t datalen;
+};
+
+static void locking_tdb_data_fetch_fn(
+       struct server_id exclusive,
+       size_t num_shared,
+       struct server_id *shared,
+       const uint8_t *data,
+       size_t datalen,
+       void *private_data)
+{
+       struct locking_tdb_data_fetch_state *state = private_data;
+       state->datalen = datalen;
+       state->data = talloc_memdup(state->mem_ctx, data, datalen);
+}
+
+static NTSTATUS locking_tdb_data_fetch(
+       TDB_DATA key, TALLOC_CTX *mem_ctx, struct locking_tdb_data **ltdb)
+{
+       struct locking_tdb_data_fetch_state state = { 0 };
+       struct locking_tdb_data *result = NULL;
+       NTSTATUS status;
+       bool ok;
+
+       result = talloc_zero(mem_ctx, struct locking_tdb_data);
+       if (result == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
+       state.mem_ctx = result;
+
+       status = g_lock_dump(lock_ctx, key, locking_tdb_data_fetch_fn, &state);
+
+       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
+               /*
+                * Just return an empty record
+                */
+               goto done;
+       }
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_DEBUG("g_lock_dump failed: %s\n", nt_errstr(status));
+               return status;
+       }
+       if (state.datalen == 0) {
+               goto done;
+       }
+
+       ok = locking_tdb_data_get(result, state.data, state.datalen);
+       if (!ok) {
+               DBG_DEBUG("locking_tdb_data_get failed for %zu bytes\n",
+                         state.datalen);
+               TALLOC_FREE(result);
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+
+done:
+       *ltdb = result;
+       return NT_STATUS_OK;
+}
+
+static NTSTATUS locking_tdb_data_store(
+       TDB_DATA key,
+       const struct locking_tdb_data *ltdb,
+       const TDB_DATA *share_mode_dbufs,
+       size_t num_share_mode_dbufs)
+{
+       uint8_t share_mode_data_len_buf[4];
+       TDB_DATA dbufs[num_share_mode_dbufs+3];
+       NTSTATUS status;
+
+       if ((ltdb->share_mode_data_len == 0) &&
+           (ltdb->num_share_entries == 0) &&
+           (num_share_mode_dbufs == 0)) {
+               /*
+                * Nothing to write
+                */
+               status = g_lock_write_data(lock_ctx, key, NULL, 0);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DBG_DEBUG("g_lock_writev_data() failed: %s\n",
+                                 nt_errstr(status));
+               }
+               return status;
+       }
+
+       PUSH_LE_U32(share_mode_data_len_buf, 0, ltdb->share_mode_data_len);
+
+       dbufs[0] = (TDB_DATA) {
+               .dptr = share_mode_data_len_buf,
+               .dsize = sizeof(share_mode_data_len_buf),
+       };
+       dbufs[1] = (TDB_DATA) {
+               .dptr = discard_const_p(uint8_t, ltdb->share_mode_data_buf),
+               .dsize = ltdb->share_mode_data_len,
+       };
+
+       if (ltdb->num_share_entries > SIZE_MAX/SHARE_MODE_ENTRY_SIZE) {
+               /* overflow */
+               return NT_STATUS_BUFFER_OVERFLOW;
+       }
+       dbufs[2] = (TDB_DATA) {
+               .dptr = discard_const_p(uint8_t, ltdb->share_entries),
+               .dsize = ltdb->num_share_entries * SHARE_MODE_ENTRY_SIZE,
+       };
+
+       if (num_share_mode_dbufs != 0) {
+               memcpy(&dbufs[3],
+                      share_mode_dbufs,
+                      num_share_mode_dbufs * sizeof(TDB_DATA));
+       }
+
+       status = g_lock_writev_data(lock_ctx, key, dbufs, ARRAY_SIZE(dbufs));
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_DEBUG("g_lock_writev_data() failed: %s\n",
+                         nt_errstr(status));
+       }
+       return status;
+}
+
 /*******************************************************************
  Get all share mode entries for a dev/inode pair.
 ********************************************************************/
@@ -496,6 +648,7 @@ fail:
 static NTSTATUS share_mode_data_store(struct share_mode_data *d)
 {
        TDB_DATA key = locking_key(&d->id);
+       struct locking_tdb_data *ltdb = NULL;
        DATA_BLOB blob = { 0 };
        NTSTATUS status;
 
@@ -511,32 +664,32 @@ static NTSTATUS share_mode_data_store(struct share_mode_data *d)
 
        d->sequence_number += 1;
 
-       if (d->have_share_modes) {
+       status = locking_tdb_data_fetch(key, d, &ltdb);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+
+       if (ltdb->num_share_entries != 0) {
                enum ndr_err_code ndr_err;
 
                ndr_err = ndr_push_struct_blob(
                        &blob,
-                       d,
+                       ltdb,
                        d,
                        (ndr_push_flags_fn_t)ndr_push_share_mode_data);
                if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
                        DBG_DEBUG("ndr_push_share_mode_data failed: %s\n",
                                  ndr_errstr(ndr_err));
+                       TALLOC_FREE(ltdb);
                        return ndr_map_error2ntstatus(ndr_err);
                }
-       } else {
-               bool share_entries_exist;
-               share_entries_exist = dbwrap_exists(share_entries_db, key);
-               SMB_ASSERT(!share_entries_exist);
        }
 
-       status = g_lock_write_data(lock_ctx, key, blob.data, blob.length);
-       TALLOC_FREE(blob.data);
-       if (!NT_STATUS_IS_OK(status)) {
-               DBG_DEBUG("g_lock_write_data failed: %s\n",
-                         nt_errstr(status));
-       }
+       ltdb->share_mode_data_buf = blob.data;
+       ltdb->share_mode_data_len = blob.length;
 
+       status = locking_tdb_data_store(key, ltdb, NULL, 0);
+       TALLOC_FREE(ltdb);
        return status;
 }
 
@@ -632,9 +785,22 @@ static void get_static_share_mode_data_fn(
        void *private_data)
 {
        struct get_static_share_mode_data_state *state = private_data;
+       TDB_DATA key = locking_key(&state->id);
        struct share_mode_data *d = NULL;
+       struct locking_tdb_data ltdb = { 0 };
+
+       if (datalen != 0) {
+               bool ok;
+
+               ok = locking_tdb_data_get(&ltdb, data, datalen);
+               if (!ok) {
+                       DBG_DEBUG("locking_tdb_data_get failed\n");
+                       state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+                       return;
+               }
+       }
 
-       if (datalen == 0) {
+       if (ltdb.share_mode_data_len == 0) {
                if (state->smb_fname == NULL) {
                        state->status = NT_STATUS_NOT_FOUND;
                        return;
@@ -649,8 +815,11 @@ static void get_static_share_mode_data_fn(
                        return;
                }
        } else {
-               TDB_DATA key = locking_key(&state->id);
-               d = parse_share_modes(lock_ctx, key, data, datalen);
+               d = parse_share_modes(
+                       lock_ctx,
+                       key,
+                       ltdb.share_mode_data_buf,
+                       ltdb.share_mode_data_len);
                if (d == NULL) {
                        state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
                        return;
@@ -856,12 +1025,18 @@ static void share_mode_do_locked_fn(
 {
        struct share_mode_do_locked_state *state = private_data;
        bool modified_dependent = false;
-       TDB_DATA value = {
-               .dptr = discard_const_p(uint8_t, data), .dsize = datalen,
-       };
+       struct locking_tdb_data ltdb = { 0 };
+       bool ok;
+
+       ok = locking_tdb_data_get(
+               &ltdb, discard_const_p(uint8_t, data), datalen);
+       if (!ok) {
+               DBG_WARNING("locking_tdb_data_get failed\n");
+               return;
+       }
 
-       state->fn(value.dptr,
-                 value.dsize,
+       state->fn(ltdb.share_mode_data_buf,
+                 ltdb.share_mode_data_len,
                  &modified_dependent,
                  state->private_data);
 
@@ -1027,8 +1202,17 @@ static void fetch_share_mode_unlocked_parser(
        void *private_data)
 {
        struct fetch_share_mode_unlocked_state *state = private_data;
+       struct locking_tdb_data ltdb = { 0 };
 
-       if (datalen == 0) {
+       if (datalen != 0) {
+               bool ok = locking_tdb_data_get(&ltdb, data, datalen);
+               if (!ok) {
+                       DBG_DEBUG("locking_tdb_data_get failed\n");
+                       return;
+               }
+       }
+
+       if (ltdb.share_mode_data_len == 0) {
                /* Likely a ctdb tombstone record, ignore it */
                return;
        }
@@ -1040,7 +1224,14 @@ static void fetch_share_mode_unlocked_parser(
        }
 
        state->lck->data = parse_share_modes(
-               state->lck, state->key, data, datalen);
+               state->lck,
+               state->key,
+               ltdb.share_mode_data_buf,
+               ltdb.share_mode_data_len);
+       if (state->lck->data == NULL) {
+               DBG_DEBUG("parse_share_modes failed\n");
+               TALLOC_FREE(state->lck);
+       }
 }
 
 /*******************************************************************
@@ -1147,6 +1338,20 @@ static void fetch_share_mode_fn(
 {
        struct fetch_share_mode_state *state = talloc_get_type_abort(
                private_data, struct fetch_share_mode_state);
+       struct locking_tdb_data ltdb = { 0 };
+
+       if (datalen != 0) {
+               bool ok = locking_tdb_data_get(&ltdb, data, datalen);
+               if (!ok) {
+                       DBG_DEBUG("locking_tdb_data_get failed\n");
+                       return;
+               }
+       }
+
+       if (ltdb.share_mode_data_len == 0) {
+               /* Likely a ctdb tombstone record, ignore it */
+               return;
+       }
 
        state->lck = talloc(state, struct share_mode_lock);
        if (state->lck == NULL) {
@@ -1156,7 +1361,14 @@ static void fetch_share_mode_fn(
        }
 
        state->lck->data = parse_share_modes(
-               state->lck, locking_key(&state->id), data, datalen);
+               state->lck,
+               locking_key(&state->id),
+               ltdb.share_mode_data_buf,
+               ltdb.share_mode_data_len);
+       if (state->lck->data == NULL) {
+               DBG_DEBUG("parse_share_modes failed\n");
+               TALLOC_FREE(state->lck);
+       }
 }
 
 static void fetch_share_mode_done(struct tevent_req *subreq)
@@ -1193,7 +1405,7 @@ NTSTATUS fetch_share_mode_recv(struct tevent_req *req,
                return status;
        }
 
-       if (state->lck->data == NULL) {
+       if (state->lck == NULL) {
                tevent_req_received(req);
                return NT_STATUS_NOT_FOUND;
        }
@@ -1228,6 +1440,8 @@ static void share_mode_forall_dump_fn(
 {
        struct share_mode_forall_state *state = private_data;
        struct file_id fid;
+       struct locking_tdb_data ltdb = { 0 };
+       bool ok;
        struct share_mode_data *d;
 
        if (state->key.dsize != sizeof(fid)) {
@@ -1236,7 +1450,17 @@ static void share_mode_forall_dump_fn(
        }
        memcpy(&fid, state->key.dptr, sizeof(fid));
 
-       d = parse_share_modes(talloc_tos(), state->key, data, datalen);
+       ok = locking_tdb_data_get(&ltdb, data, datalen);
+       if (!ok) {
+               DBG_DEBUG("locking_tdb_data_get() failed\n");
+               return;
+       }
+
+       d = parse_share_modes(
+               talloc_tos(),
+               state->key,
+               ltdb.share_mode_data_buf,
+               ltdb.share_mode_data_len);
        if (d == NULL) {
                DBG_DEBUG("parse_share_modes() failed\n");
                return;
@@ -1435,7 +1659,6 @@ bool share_mode_cleanup_disconnected(struct file_id fid,
        bool ret = false;
        TALLOC_CTX *frame = talloc_stackframe();
        struct file_id_buf idbuf;
-       NTSTATUS status;
        bool ok;
 
        state.lck = get_existing_share_mode_lock(frame, fid);
@@ -1505,16 +1728,6 @@ bool share_mode_cleanup_disconnected(struct file_id fid,
                  ? "" : data->stream_name,
                  open_persistent_id);
 
-       /*
-        * No connected share entries left, wipe them all
-        */
-       status = dbwrap_delete(share_entries_db, locking_key(&fid));
-       if (!NT_STATUS_IS_OK(status)) {
-               DBG_DEBUG("dbwrap_delete failed: %s\n",
-                         nt_errstr(status));
-               goto done;
-       }
-
        data->have_share_modes = false;
        data->modified = true;
 
@@ -1598,74 +1811,99 @@ static size_t share_mode_entry_find(
        return left;
 }
 
-struct set_share_mode_state {
-       struct share_mode_entry e;
-       bool created_share_mode_record;
-       NTSTATUS status;
-};
-
-static void set_share_mode_fn(
-       struct db_record *rec,
-       TDB_DATA data,
-       void *private_data)
+bool set_share_mode(struct share_mode_lock *lck,
+                   struct files_struct *fsp,
+                   uid_t uid,
+                   uint64_t mid,
+                   uint16_t op_type,
+                   uint32_t share_access,
+                   uint32_t access_mask)
 {
-       struct set_share_mode_state *state = private_data;
-       size_t idx, num_share_modes;
-       struct share_mode_entry tmp;
-       struct share_mode_entry_buf buf;
+       struct share_mode_data *d = lck->data;
+       TDB_DATA key = locking_key(&d->id);
+       struct server_id my_pid = messaging_server_id(
+               fsp->conn->sconn->msg_ctx);
+       struct locking_tdb_data *ltdb = NULL;
+       size_t idx;
+       struct share_mode_entry e = { .pid.pid = 0 };
+       struct share_mode_entry_buf e_buf;
+       NTSTATUS status;
        bool ok, found;
 
        TDB_DATA dbufs[3];
        size_t num_dbufs = 0;
 
-       if ((data.dsize % SHARE_MODE_ENTRY_SIZE) != 0) {
-               DBG_WARNING("Got invalid record size %zu\n", data.dsize);
-               state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
-               return;
-       }
-       num_share_modes = data.dsize / SHARE_MODE_ENTRY_SIZE;
-
-       ok = share_mode_entry_put(&state->e, &buf);
-       if (!ok) {
-               DBG_DEBUG("share_mode_entry_put failed\n");
-               state->status = NT_STATUS_INTERNAL_ERROR;
-               return;
+       status = locking_tdb_data_fetch(key, talloc_tos(), &ltdb);
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_DEBUG("locking_tdb_data_fetch failed: %s\n",
+                         nt_errstr(status));
+               return false;
        }
-
-       DBG_DEBUG("num_share_modes=%zu\n", num_share_modes);
+       DBG_DEBUG("num_share_modes=%zu\n", ltdb->num_share_entries);
 
        idx = share_mode_entry_find(
-               data.dptr,
-               num_share_modes,
-               state->e.pid,
-               state->e.share_file_id,
-               &tmp,
+               ltdb->share_entries,
+               ltdb->num_share_entries,
+               my_pid,
+               fsp->fh->gen_id,
+               &e,
                &found);
        if (found) {
                DBG_WARNING("Found duplicate share mode\n");
-               state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
-               return;
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto done;
+       }
+
+       e = (struct share_mode_entry) {
+               .pid = my_pid,
+               .share_access = share_access,
+               .private_options = fsp->fh->private_options,
+               .access_mask = access_mask,
+               .op_mid = mid,
+               .op_type = op_type,
+               .time.tv_sec = fsp->open_time.tv_sec,
+               .time.tv_usec = fsp->open_time.tv_usec,
+               .share_file_id = fsp->fh->gen_id,
+               .uid = (uint32_t)uid,
+               .flags = (fsp->posix_flags & FSP_POSIX_FLAGS_OPEN) ?
+                       SHARE_MODE_FLAG_POSIX_OPEN : 0,
+               .name_hash = fsp->name_hash,
+       };
+
+       if (op_type == LEASE_OPLOCK) {
+               const struct GUID *client_guid = fsp_client_guid(fsp);
+               e.client_guid = *client_guid;
+               e.lease_key = fsp->lease->lease.lease_key;
+       }
+
+       ok = share_mode_entry_put(&e, &e_buf);
+       if (!ok) {
+               DBG_DEBUG("share_mode_entry_put failed\n");
+               status = NT_STATUS_INTERNAL_ERROR;
+               goto done;
        }
 
        DBG_DEBUG("idx=%zu, found=%d\n", idx, (int)found);
 
        if (idx > 0) {
                dbufs[num_dbufs] = (TDB_DATA) {
-                       .dptr = data.dptr,
+                       .dptr = discard_const_p(uint8_t, ltdb->share_entries),
                        .dsize = idx * SHARE_MODE_ENTRY_SIZE,
                };
                num_dbufs += 1;
        }
 
        dbufs[num_dbufs] = (TDB_DATA) {
-               .dptr = buf.buf, .dsize = SHARE_MODE_ENTRY_SIZE,
+               .dptr = e_buf.buf, .dsize = SHARE_MODE_ENTRY_SIZE,
        };
        num_dbufs += 1;
 
-       if (idx < num_share_modes) {
+       if (idx < ltdb->num_share_entries) {
+               size_t num_after_idx = (ltdb->num_share_entries-idx);
                dbufs[num_dbufs] = (TDB_DATA) {
-                       .dptr = data.dptr + idx * SHARE_MODE_ENTRY_SIZE,
-                       .dsize = (num_share_modes-idx) * SHARE_MODE_ENTRY_SIZE,
+                       .dptr = discard_const_p(uint8_t, ltdb->share_entries) +
+                               idx * SHARE_MODE_ENTRY_SIZE,
+                       .dsize = num_after_idx * SHARE_MODE_ENTRY_SIZE,
                };
                num_dbufs += 1;
        }
@@ -1680,78 +1918,57 @@ static void set_share_mode_fn(
                }
        }
 
-       state->created_share_mode_record = (num_share_modes == 0);
-       state->status = dbwrap_record_storev(rec, dbufs, num_dbufs, 0);
-}
+       if (num_dbufs == 1) {
+               /*
+                * Storing a fresh record with just one share entry
+                */
+               d->have_share_modes = true;
+               d->modified = true;
+       }
 
-bool set_share_mode(struct share_mode_lock *lck,
-                   struct files_struct *fsp,
-                   uid_t uid,
-                   uint64_t mid,
-                   uint16_t op_type,
-                   uint32_t share_access,
-                   uint32_t access_mask)
-{
-       struct share_mode_data *d = lck->data;
-       struct set_share_mode_state state = {
-               .status = NT_STATUS_OK,
-               .e.pid = messaging_server_id(fsp->conn->sconn->msg_ctx),
-               .e.share_access = share_access,
-               .e.private_options = fsp->fh->private_options,
-               .e.access_mask = access_mask,
-               .e.op_mid = mid,
-               .e.op_type = op_type,
-               .e.time.tv_sec = fsp->open_time.tv_sec,
-               .e.time.tv_usec = fsp->open_time.tv_usec,
-               .e.share_file_id = fsp->fh->gen_id,
-               .e.uid = (uint32_t)uid,
-               .e.flags = (fsp->posix_flags & FSP_POSIX_FLAGS_OPEN) ?
-               SHARE_MODE_FLAG_POSIX_OPEN : 0,
-               .e.name_hash = fsp->name_hash,
-       };
-       NTSTATUS status;
+       /*
+        * If there was any existing data in
+        * ltdb->share_entries, it's now been
+        * moved and we've split it into:
+        *
+        * num_dbufs = 3
+        * dbufs[0] -> old sorted data less than new_entry
+        * dbufs[1] -> new_share_mode_entry
+        * dbufs[2] -> old sorted_data greater than new entry.
+        *
+        * So the old data inside ltdb->share_entries is
+        * no longer valid.
+        *
+        * If we're storing a brand new entry the
+        * dbufs look like:
+        *
+        * num_dbufs = 1
+        * dbufs[0] -> new_share_mode_entry
+        *
+        * Either way we must set ltdb->share_entries = NULL
+        * and ltdb->num_share_entries = 0 so that
+        * locking_tdb_data_store() doesn't use it to
+        * store any data. It's no longer there.
+        */
 
-       if (op_type == LEASE_OPLOCK) {
-               const struct GUID *client_guid = fsp_client_guid(fsp);
-               state.e.client_guid = *client_guid;
-               state.e.lease_key = fsp->lease->lease.lease_key;
-       }
+       ltdb->share_entries = NULL;
+       ltdb->num_share_entries = 0;
 
-       status = dbwrap_do_locked(
-               share_entries_db,
-               locking_key(&d->id),
-               set_share_mode_fn,
-               &state);
+       status = locking_tdb_data_store(key, ltdb, dbufs, num_dbufs);
        if (!NT_STATUS_IS_OK(status)) {
-               DBG_WARNING("dbwrap_do_locked failed: %s\n",
-                           nt_errstr(status));
-               return false;
-       }
-       if (!NT_STATUS_IS_OK(state.status)) {
-               DBG_WARNING("set_share_mode_fn failed: %s\n",
-                           nt_errstr(state.status));
-               return false;
-       }
-
-       if (state.created_share_mode_record) {
-               d->have_share_modes = true;
-               d->modified = true;
+               DBG_DEBUG("locking_tdb_data_store failed: %s\n",
+                         nt_errstr(status));
        }
-
-       return true;
+done:
+       TALLOC_FREE(ltdb);
+       return NT_STATUS_IS_OK(status);
 }
 
-struct share_mode_forall_entries_state {
-       struct share_mode_lock *lck;
+static bool share_mode_for_one_entry(
        bool (*fn)(struct share_mode_entry *e,
                   bool *modified,
-                  void *private_data);
-       void *private_data;
-       bool ok;
-};
-
-static bool share_mode_for_one_entry(
-       struct share_mode_forall_entries_state *state,
+                  void *private_data),
+       void *private_data,
        size_t *i,
        uint8_t *data,
        size_t *num_share_modes,
@@ -1785,7 +2002,7 @@ static bool share_mode_for_one_entry(
        e_pid = e.pid;
        e_share_file_id = e.share_file_id;
 
-       stop = state->fn(&e, &modified, state->private_data);
+       stop = fn(&e, &modified, private_data);
 
        DBG_DEBUG("entry[%zu]: modified=%d, e.stale=%d\n",
                  *i,
@@ -1842,100 +2059,75 @@ static bool share_mode_for_one_entry(
        return false;
 }
 
-static void share_mode_forall_entries_fn(
-       struct db_record *rec,
-       TDB_DATA data,
+bool share_mode_forall_entries(
+       struct share_mode_lock *lck,
+       bool (*fn)(struct share_mode_entry *e,
+                  bool *modified,
+                  void *private_data),
        void *private_data)
 {
-       struct share_mode_forall_entries_state *state = private_data;
-       struct share_mode_data *d = state->lck->data;
-       size_t num_share_modes;
+       struct share_mode_data *d = lck->data;
+       TDB_DATA key = locking_key(&d->id);
+       struct locking_tdb_data *ltdb = NULL;
+       uint8_t *share_entries = NULL;
+       size_t num_share_entries;
        bool writeback = false;
        NTSTATUS status;
        bool stop = false;
        size_t i;
 
-       if ((data.dsize % SHARE_MODE_ENTRY_SIZE) != 0) {
-               DBG_WARNING("Invalid data size %zu\n", data.dsize);
-               return;
+       status = locking_tdb_data_fetch(key, talloc_tos(), &ltdb);
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_DEBUG("locking_tdb_data_fetch failed: %s\n",
+                         nt_errstr(status));
+               return false;
        }
-       num_share_modes = data.dsize / SHARE_MODE_ENTRY_SIZE;
+       DBG_DEBUG("num_share_modes=%zu\n", ltdb->num_share_entries);
 
-       DBG_DEBUG("num_share_modes=%zu\n", num_share_modes);
+       num_share_entries = ltdb->num_share_entries;
+       share_entries = discard_const_p(uint8_t, ltdb->share_entries);
 
        i = 0;
-       while (i<num_share_modes) {
+       while (i<num_share_entries) {
                stop = share_mode_for_one_entry(
-                       state, &i, data.dptr, &num_share_modes, &writeback);
+                       fn,
+                       private_data,
+                       &i,
+                       share_entries,
+                       &num_share_entries,
+                       &writeback);
                if (stop) {
                        break;
                }
        }
 
-       DBG_DEBUG("num_share_modes=%zu, writeback=%d\n",
-                 num_share_modes,
+       DBG_DEBUG("num_share_entries=%zu, writeback=%d\n",
+                 num_share_entries,
                  (int)writeback);
 
        if (!writeback) {
-               state->ok = true;
-               return;
-       }
-
-       if (num_share_modes == 0) {
-               if (data.dsize != 0) {
-                       d->have_share_modes = false;
-                       d->modified = true;
-               }
-               status = dbwrap_record_delete(rec);
-       } else {
-               TDB_DATA value = {
-                       .dptr = data.dptr,
-                       .dsize = num_share_modes * SHARE_MODE_ENTRY_SIZE,
-               };
-               status = dbwrap_record_store(rec, value, 0);
+               return true;
        }
 
-       if (!NT_STATUS_IS_OK(status)) {
-               DBG_DEBUG("Storing record with %zu entries failed: %s\n",
-                         num_share_modes,
-                         nt_errstr(status));
-               return;
+       if ((ltdb->num_share_entries != 0 ) && (num_share_entries == 0)) {
+               /*
+                * This routine wiped all share entries
+                */
+               d->have_share_modes = false;
+               d->modified = true;
        }
 
+       ltdb->num_share_entries = num_share_entries;
+       ltdb->share_entries = share_entries;
 
-       state->ok = true;
-}
-
-bool share_mode_forall_entries(
-       struct share_mode_lock *lck,
-       bool (*fn)(struct share_mode_entry *e,
-                  bool *modified,
-                  void *private_data),
-       void *private_data)
-{
-       struct share_mode_forall_entries_state state = {
-               .lck = lck,
-               .fn = fn,
-               .private_data = private_data,
-       };
-       NTSTATUS status;
-
-       status = dbwrap_do_locked(
-               share_entries_db,
-               locking_key(&lck->data->id),
-               share_mode_forall_entries_fn,
-               &state);
-       if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
-               status = NT_STATUS_OK;
-               state.ok = true;
-       }
+       status = locking_tdb_data_store(key, ltdb, NULL, 0);
        if (!NT_STATUS_IS_OK(status)) {
-               DBG_DEBUG("dbwrap_do_locked failed: %s\n",
+               DBG_DEBUG("locking_tdb_data_store failed: %s\n",
                          nt_errstr(status));
                return false;
        }
 
-       return state.ok;
+       return true;
 }
 
 struct share_mode_count_entries_state {
@@ -1944,16 +2136,24 @@ struct share_mode_count_entries_state {
 };
 
 static void share_mode_count_entries_fn(
-       TDB_DATA key, TDB_DATA data, void *private_data)
+       struct server_id exclusive,
+       size_t num_shared,
+       struct server_id *shared,
+       const uint8_t *data,
+       size_t datalen,
+       void *private_data)
 {
        struct share_mode_count_entries_state *state = private_data;
+       struct locking_tdb_data ltdb = { 0 };
+       bool ok;
 
-       if ((data.dsize % SHARE_MODE_ENTRY_SIZE) != 0) {
-               DBG_WARNING("Invalid data size %zu\n", data.dsize);
+       ok = locking_tdb_data_get(&ltdb, data, datalen);
+       if (!ok) {
+               DBG_WARNING("locking_tdb_data_get failed for %zu\n", datalen);
                state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
                return;
        }
-       state->num_share_modes = data.dsize / SHARE_MODE_ENTRY_SIZE;
+       state->num_share_modes = ltdb.num_share_entries;
        state->status = NT_STATUS_OK;
 }
 
@@ -1964,18 +2164,18 @@ NTSTATUS share_mode_count_entries(struct file_id fid, size_t *num_share_modes)
        };
        NTSTATUS status;
 
-       status = dbwrap_parse_record(
-               share_entries_db,
+       status = g_lock_dump(
+               lock_ctx,
                locking_key(&fid),
                share_mode_count_entries_fn,
                &state);
        if (!NT_STATUS_IS_OK(status)) {
-               DBG_DEBUG("dbwrap_parse_record failed: %s\n",
+               DBG_DEBUG("g_lock_dump failed: %s\n",
                          nt_errstr(status));
                return status;
        }
        if (!NT_STATUS_IS_OK(state.status)) {
-               DBG_DEBUG("share_mode_forall_entries_fn failed: %s\n",
+               DBG_DEBUG("share_mode_count_entries_fn failed: %s\n",
                          nt_errstr(state.status));
                return state.status;
        }
@@ -1984,169 +2184,108 @@ NTSTATUS share_mode_count_entries(struct file_id fid, size_t *num_share_modes)
        return NT_STATUS_OK;
 }
 
-struct share_mode_entry_do_state {
-       struct server_id pid;
-       uint64_t share_file_id;
+static bool share_mode_entry_do(
+       struct share_mode_lock *lck,
+       struct server_id pid,
+       uint64_t share_file_id,
        void (*fn)(struct share_mode_entry *e,
                   size_t num_share_modes,
                   bool *modified,
-                  void *private_data);
-       void *private_data;
-       size_t num_share_modes;
-       NTSTATUS status;
-};
-
-static void share_mode_entry_do_fn(
-       struct db_record *rec,
-       TDB_DATA data,
+                  void *private_data),
        void *private_data)
 {
-       struct share_mode_entry_do_state *state = private_data;
+       struct share_mode_data *d = lck->data;
+       TDB_DATA key = locking_key(&d->id);
+       struct locking_tdb_data *ltdb = NULL;
        size_t idx;
        bool found = false;
        bool modified;
        struct share_mode_entry e;
-       struct share_mode_entry_buf buf;
-       TDB_DATA dbufs[3];
-       size_t num_dbufs = 0;
+       uint8_t *e_ptr = NULL;
+       bool have_share_modes;
+       NTSTATUS status;
+       bool ret = false;
 
-       if ((data.dsize % SHARE_MODE_ENTRY_SIZE) != 0) {
-               DBG_WARNING("Invalid data size %zu\n", data.dsize);
-               state->status = NT_STATUS_INTERNAL_DB_CORRUPTION;
-               return;
+       status = locking_tdb_data_fetch(key, talloc_tos(), &ltdb);
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_DEBUG("locking_tdb_data_fetch failed: %s\n",
+                         nt_errstr(status));
+               return false;
        }
-       state->num_share_modes = data.dsize / SHARE_MODE_ENTRY_SIZE;
-
-       DBG_DEBUG("state->num_share_modes=%zu\n", state->num_share_modes);
+       DBG_DEBUG("num_share_modes=%zu\n", ltdb->num_share_entries);
 
        idx = share_mode_entry_find(
-               data.dptr,
-               state->num_share_modes,
-               state->pid,
-               state->share_file_id,
+               ltdb->share_entries,
+               ltdb->num_share_entries,
+               pid,
+               share_file_id,
                &e,
                &found);
        if (!found) {
                DBG_WARNING("Did not find share mode entry for %"PRIu64"\n",
-                           state->share_file_id);
-               state->status = NT_STATUS_NOT_FOUND;
-               return;
+                           share_file_id);
+               goto done;
        }
 
-       state->fn(&e, state->num_share_modes, &modified, state->private_data);
+       fn(&e, ltdb->num_share_entries, &modified, private_data);
 
        if (!e.stale && !modified) {
-               state->status = NT_STATUS_OK;
-               return;
+               ret = true;
+               goto done;
        }
 
-       if (idx > 0) {
-               dbufs[num_dbufs] = (TDB_DATA) {
-                       .dptr = data.dptr,
-                       .dsize = idx * SHARE_MODE_ENTRY_SIZE,
-               };
-               num_dbufs += 1;
-       }
+       e_ptr = discard_const_p(uint8_t, ltdb->share_entries) +
+               idx * SHARE_MODE_ENTRY_SIZE;
 
-       if (!e.stale) {
+       if (e.stale) {
+               /*
+                * Move the reset down one entry
+                */
+
+               size_t behind = ltdb->num_share_entries - idx - 1;
+               if (behind != 0) {
+                       memmove(e_ptr,
+                               e_ptr + SHARE_MODE_ENTRY_SIZE,
+                               behind * SHARE_MODE_ENTRY_SIZE);
+               }
+               ltdb->num_share_entries -= 1;
+       } else {
+               struct share_mode_entry_buf buf;
                bool ok;
 
-               if (state->num_share_modes != 1) {
+               if (ltdb->num_share_entries != 1) {
                        /*
                         * Make sure the sorting order stays intact
                         */
-                       SMB_ASSERT(server_id_equal(&e.pid, &state->pid));
-                       SMB_ASSERT(e.share_file_id == state->share_file_id);
+                       SMB_ASSERT(server_id_equal(&e.pid, &pid));
+                       SMB_ASSERT(e.share_file_id == share_file_id);
                }
 
                ok = share_mode_entry_put(&e, &buf);
                if (!ok) {
                        DBG_DEBUG("share_mode_entry_put failed\n");
-                       state->status = NT_STATUS_INTERNAL_ERROR;
-                       return;
-               }
-
-               dbufs[num_dbufs] = (TDB_DATA) {
-                       .dptr = buf.buf, .dsize = SHARE_MODE_ENTRY_SIZE,
-               };
-               num_dbufs += 1;
-       }
-
-       idx += 1;
-
-       if (idx < state->num_share_modes) {
-               size_t behind = state->num_share_modes - idx;
-               dbufs[num_dbufs] = (TDB_DATA) {
-                       .dptr = data.dptr + idx * SHARE_MODE_ENTRY_SIZE,
-                       .dsize = behind * SHARE_MODE_ENTRY_SIZE,
-               };
-               num_dbufs += 1;
-       }
-
-       if (e.stale) {
-               state->num_share_modes -= 1;
-       }
-
-       if (state->num_share_modes == 0) {
-               state->status = dbwrap_record_delete(rec);
-               if (!NT_STATUS_IS_OK(state->status)) {
-                       DBG_DEBUG("dbwrap_record_delete failed: %s\n",
-                                 nt_errstr(state->status));
+                       goto done;
                }
-               return;
-       }
-
-       state->status = dbwrap_record_storev(rec, dbufs, num_dbufs, 0);
-       if (!NT_STATUS_IS_OK(state->status)) {
-               DBG_DEBUG("dbwrap_record_storev failed: %s\n",
-                         nt_errstr(state->status));
-               return;
+               memcpy(e_ptr, buf.buf, SHARE_MODE_ENTRY_SIZE);
        }
-}
-
-static bool share_mode_entry_do(
-       struct share_mode_lock *lck,
-       struct server_id pid,
-       uint64_t share_file_id,
-       void (*fn)(struct share_mode_entry *e,
-                  size_t num_share_modes,
-                  bool *modified,
-                  void *private_data),
-       void *private_data)
-{
-       struct share_mode_data *d = lck->data;
-       struct share_mode_entry_do_state state = {
-               .pid = pid,
-               .share_file_id = share_file_id,
-               .fn = fn,
-               .private_data = private_data,
-       };
-       NTSTATUS status;
-       bool have_share_modes;
 
-       status = dbwrap_do_locked(
-               share_entries_db,
-               locking_key(&d->id),
-               share_mode_entry_do_fn,
-               &state);
+       status = locking_tdb_data_store(key, ltdb, NULL, 0);
        if (!NT_STATUS_IS_OK(status)) {
-               DBG_DEBUG("share_mode_forall_entries failed: %s\n",
-                         nt_errstr(status));
-               return false;
-       }
-       if (!NT_STATUS_IS_OK(state.status)) {
-               DBG_DEBUG("share_mode_entry_do_fn failed: %s\n",
+               DBG_DEBUG("locking_tdb_data_store failed: %s\n",
                          nt_errstr(status));
-               return false;
+               goto done;
        }
 
-       have_share_modes = (state.num_share_modes != 0);
+       have_share_modes = (ltdb->num_share_entries != 0);
        if (d->have_share_modes != have_share_modes) {
                d->have_share_modes = have_share_modes;
                d->modified = true;
        }
 
-       return true;
+       ret = true;
+done:
+       TALLOC_FREE(ltdb);
+       return ret;
 }
 
 struct del_share_mode_state {
@@ -2338,107 +2477,76 @@ bool mark_share_mode_disconnected(struct share_mode_lock *lck,
        return true;
 }
 
-struct reset_share_mode_entry_state {
-       struct server_id old_pid;
-       uint64_t old_share_file_id;
-       struct server_id new_pid;
-       uint64_t new_mid;
-       uint64_t new_share_file_id;
-       bool ok;
-};
-
-static void reset_share_mode_entry_fn(
-       struct db_record *rec, TDB_DATA value, void *private_data)
+bool reset_share_mode_entry(
+       struct share_mode_lock *lck,
+       struct server_id old_pid,
+       uint64_t old_share_file_id,
+       struct server_id new_pid,
+       uint64_t new_mid,
+       uint64_t new_share_file_id)
 {
-       struct reset_share_mode_entry_state *state = private_data;
+       struct share_mode_data *d = lck->data;
+       TDB_DATA key = locking_key(&d->id);
+       struct locking_tdb_data *ltdb = NULL;
        struct share_mode_entry e;
        struct share_mode_entry_buf e_buf;
-       TDB_DATA newval = { .dptr = e_buf.buf, .dsize = sizeof(e_buf.buf) };
        NTSTATUS status;
-       int ret;
+       bool ret = false;
        bool ok;
 
-       if (value.dsize != SHARE_MODE_ENTRY_SIZE) {
-               DBG_WARNING("Expect one entry, got %zu bytes\n",
-                           value.dsize);
-               return;
+       status = locking_tdb_data_fetch(key, talloc_tos(), &ltdb);
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_DEBUG("locking_tdb_data_fetch failed: %s\n",
+                         nt_errstr(status));
+               return false;
+       }
+
+       if (ltdb->num_share_entries != 1) {
+               DBG_DEBUG("num_share_modes=%zu\n", ltdb->num_share_entries);
+               goto done;
        }
 
-       ok = share_mode_entry_get(value.dptr, &e);
+       ok = share_mode_entry_get(ltdb->share_entries, &e);
        if (!ok) {
                DBG_WARNING("share_mode_entry_get failed\n");
-               return;
+               goto done;
        }
 
        ret = share_mode_entry_cmp(
-               state->old_pid,
-               state->old_share_file_id,
-               e.pid,
-               e.share_file_id);
+               old_pid, old_share_file_id, e.pid, e.share_file_id);
        if (ret != 0) {
                struct server_id_buf tmp1, tmp2;
                DBG_WARNING("Expected pid=%s, file_id=%"PRIu64", "
                            "got pid=%s, file_id=%"PRIu64"\n",
-                           server_id_str_buf(state->old_pid, &tmp1),
-                           state->old_share_file_id,
+                           server_id_str_buf(old_pid, &tmp1),
+                           old_share_file_id,
                            server_id_str_buf(e.pid, &tmp2),
                            e.share_file_id);
-               return;
+               goto done;
        }
 
-       e.pid = state->new_pid;
-       e.op_mid = state->new_mid;
-       e.share_file_id = state->new_share_file_id;
+       e.pid = new_pid;
+       e.op_mid = new_mid;
+       e.share_file_id = new_share_file_id;
 
        ok = share_mode_entry_put(&e, &e_buf);
        if (!ok) {
                DBG_WARNING("share_mode_entry_put failed\n");
-               return;
-       }
-
-       status = dbwrap_record_store(rec, newval, 0);
-       if (!NT_STATUS_IS_OK(status)) {
-               DBG_WARNING("dbwrap_record_store failed: %s\n",
-                           nt_errstr(status));
-               return;
+               goto done;
        }
 
-       state->ok = true;
-}
-
-bool reset_share_mode_entry(
-       struct share_mode_lock *lck,
-       struct server_id old_pid,
-       uint64_t old_share_file_id,
-       struct server_id new_pid,
-       uint64_t new_mid,
-       uint64_t new_share_file_id)
-{
-       struct share_mode_data *d = lck->data;
-       struct reset_share_mode_entry_state state = {
-               .old_pid = old_pid,
-               .old_share_file_id = old_share_file_id,
-               .new_pid = new_pid,
-               .new_mid = new_mid,
-               .new_share_file_id = new_share_file_id,
-       };
-       NTSTATUS status;
+       ltdb->share_entries = e_buf.buf;
 
-       status = dbwrap_do_locked(
-               share_entries_db,
-               locking_key(&d->id),
-               reset_share_mode_entry_fn,
-               &state);
+       status = locking_tdb_data_store(key, ltdb, NULL, 0);
        if (!NT_STATUS_IS_OK(status)) {
-               DBG_WARNING("dbwrap_do_locked failed: %s\n",
-                           nt_errstr(status));
-               return false;
-       }
-       if (!state.ok) {
-               DBG_WARNING("reset_share_mode_fn failed\n");
-               return false;
+               DBG_DEBUG("locking_tdb_data_store failed: %s\n",
+                         nt_errstr(status));
+               goto done;
        }
-       d->have_share_modes = true;
 
+       d->have_share_modes = true;
+       ret = true;
+done:
+       TALLOC_FREE(ltdb);
        return true;
 }