s3: Enforce a lock order in dbwrap
[metze/samba/wip.git] / source3 / lib / dbwrap / dbwrap.c
index af8e28cb7e5a67c2a4fc5e115b124bc61cc6894a..e443fa5eb3481d6d6f737088261da3498f82335f 100644 (file)
@@ -1,20 +1,20 @@
-/* 
+/*
    Unix SMB/CIFS implementation.
    Database interface wrapper
    Copyright (C) Jim McDonough <jmcd@us.ibm.com> 2006
 
    Major code contributions from Aleksey Fedoseev (fedoseev@ru.ibm.com)
-   
+
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.
-   
+
    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.
-   
+
    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 #include "util_tdb.h"
 
 /*
- * Fall back using fetch_locked if no genuine fetch operation is provided
+ * Fall back using fetch if no genuine exists operation is provided
  */
 
-int dbwrap_fallback_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
-                         TDB_DATA key, TDB_DATA *data)
+static int dbwrap_fallback_exists(struct db_context *db, TDB_DATA key)
 {
-       struct db_record *rec;
-
-       if (!(rec = db->fetch_locked(db, mem_ctx, key))) {
-               return -1;
-       }
+       NTSTATUS status = dbwrap_parse_record(db, key, NULL, NULL);
+       return NT_STATUS_IS_OK(status) ? 1 : 0;
+}
 
-       data->dsize = rec->value.dsize;
-       data->dptr = talloc_move(mem_ctx, &rec->value.dptr);
-       TALLOC_FREE(rec);
-       return 0;
+static int delete_record(struct db_record *rec, void *data)
+{
+       NTSTATUS status = dbwrap_record_delete(rec);
+       return NT_STATUS_IS_OK(status) ? 0 : -1;
 }
 
 /*
- * Fall back using fetch if no genuine exists operation is provided
+ * Fallback wipe implementation using traverse and delete if no genuine
+ * wipe operation is provided
  */
-
-static int dbwrap_fallback_exists(struct db_context *db, TDB_DATA key)
+static int dbwrap_fallback_wipe(struct db_context *db)
 {
-       int res = dbwrap_parse_record(db, key, NULL, NULL);
-       return  ( res == -1) ? 0 : 1;
+       NTSTATUS status = dbwrap_trans_traverse(db, delete_record, NULL);
+       return NT_STATUS_IS_OK(status) ? 0 : -1;
 }
 
+
 /*
- * Fall back using fetch if no genuine parse operation is provided
+ * Wrapper functions for the backend methods
  */
 
-int dbwrap_fallback_parse_record(struct db_context *db, TDB_DATA key,
-                                int (*parser)(TDB_DATA key,
-                                              TDB_DATA data,
-                                              void *private_data),
-                                void *private_data)
+TDB_DATA dbwrap_record_get_key(const struct db_record *rec)
 {
-       TDB_DATA data;
-       int res;
+       return rec->key;
+}
+
+TDB_DATA dbwrap_record_get_value(const struct db_record *rec)
+{
+       return rec->value;
+}
+
+NTSTATUS dbwrap_record_store(struct db_record *rec, TDB_DATA data, int flags)
+{
+       return rec->store(rec, data, flags);
+}
+
+NTSTATUS dbwrap_record_delete(struct db_record *rec)
+{
+       return rec->delete_rec(rec);
+}
+
+struct dbwrap_lock_order_state {
+       uint8_t *plock_order_mask;
+       uint8_t bitmask;
+};
+
+static int dbwrap_lock_order_state_destructor(
+       struct dbwrap_lock_order_state *s)
+{
+       *s->plock_order_mask &= ~s->bitmask;
+       return 0;
+}
+
+static struct dbwrap_lock_order_state *dbwrap_check_lock_order(
+       struct db_context *db, TALLOC_CTX *mem_ctx)
+{
+       /*
+        * Store the lock_order of currently locked records as bits in
+        * "lock_order_mask". We only use levels 1,2,3 right now, so a
+        * single uint8_t is enough.
+        */
+       static uint8_t lock_order_mask;
+
+       struct dbwrap_lock_order_state *state;
+       uint8_t idx;
+       int used;
+
+       if (db->lock_order == 0) {
+               /*
+                * lock order 0 is for example for dbwrap_rbt without
+                * real locking. Return state nevertheless to avoid
+                * special cases.
+                */
+               return talloc(mem_ctx, struct dbwrap_lock_order_state);
+       }
+
+       /*
+        * We fill bits from the high bits, to be able to use
+        * "ffs(lock_order_mask)"
+        */
+       idx = sizeof(lock_order_mask)*8 - db->lock_order;
+
+       used = ffs(lock_order_mask);
+
+       DEBUG(1, ("used=%d, lock_order=%d, idx=%d\n", used,
+                 (int)db->lock_order, (int)idx));
+
+       if ((used != 0) && (used-1 <= idx)) {
+               DEBUG(0, ("Lock order violation: Trying %d, order_mask=%x\n",
+                         (int)db->lock_order, (int)lock_order_mask));
+               return NULL;
+       }
 
-       res = db->fetch(db, talloc_tos(), key, &data);
-       if (res != 0) {
-               return -1;
+       state = talloc(mem_ctx, struct dbwrap_lock_order_state);
+       if (state == NULL) {
+               DEBUG(1, ("talloc failed\n"));
+               return NULL;
        }
+       state->bitmask = 1 << idx;
+       state->plock_order_mask = &lock_order_mask;
+
+       talloc_set_destructor(state, dbwrap_lock_order_state_destructor);
+       lock_order_mask |= state->bitmask;
+
+       return state;
+}
+
+struct db_record *dbwrap_fetch_locked(struct db_context *db,
+                                     TALLOC_CTX *mem_ctx,
+                                     TDB_DATA key)
+{
+       struct db_record *rec;
+       struct dbwrap_lock_order_state *lock_order;
 
-       res = parser(key, data, private_data);
-       TALLOC_FREE(data.dptr);
-       return res;
+       lock_order = dbwrap_check_lock_order(db, talloc_tos());
+       if (lock_order == NULL) {
+               return NULL;
+       }
+       rec = db->fetch_locked(db, mem_ctx, key);
+       if (rec == NULL) {
+               TALLOC_FREE(lock_order);
+               return NULL;
+       }
+       (void)talloc_steal(rec, lock_order);
+       return rec;
 }
 
+struct dbwrap_fetch_state {
+       TALLOC_CTX *mem_ctx;
+       TDB_DATA data;
+};
+
+static void dbwrap_fetch_parser(TDB_DATA key, TDB_DATA data,
+                               void *private_data)
+{
+       struct dbwrap_fetch_state *state =
+               (struct dbwrap_fetch_state *)private_data;
+
+       state->data.dsize = data.dsize;
+       state->data.dptr = (uint8_t *)talloc_memdup(state->mem_ctx, data.dptr,
+                                                   data.dsize);
+}
 
-TDB_DATA dbwrap_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
-                     TDB_DATA key)
+NTSTATUS dbwrap_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
+                     TDB_DATA key, TDB_DATA *value)
 {
-       TDB_DATA result;
+       struct dbwrap_fetch_state state;
+       NTSTATUS status;
 
-       if (db->fetch(db, mem_ctx, key, &result) != 0) {
-               return make_tdb_data(NULL, 0);
+       if (value == NULL) {
+               return NT_STATUS_INVALID_PARAMETER;
        }
 
-       return result;
+       state.mem_ctx = mem_ctx;
+
+       status = dbwrap_parse_record(db, key, dbwrap_fetch_parser, &state);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+       if ((state.data.dsize != 0) && (state.data.dptr == NULL)) {
+               return NT_STATUS_NO_MEMORY;
+       }
+       *value = state.data;
+       return NT_STATUS_OK;
 }
 
 bool dbwrap_exists(struct db_context *db, TDB_DATA key)
@@ -106,12 +217,12 @@ NTSTATUS dbwrap_store(struct db_context *db, TDB_DATA key,
        struct db_record *rec;
        NTSTATUS status;
 
-       rec = db->fetch_locked(db, talloc_tos(), key);
+       rec = dbwrap_fetch_locked(db, talloc_tos(), key);
        if (rec == NULL) {
                return NT_STATUS_NO_MEMORY;
        }
 
-       status = rec->store(rec, data, flags);
+       status = dbwrap_record_store(rec, data, flags);
        TALLOC_FREE(rec);
        return status;
 }
@@ -121,11 +232,11 @@ NTSTATUS dbwrap_delete(struct db_context *db, TDB_DATA key)
        struct db_record *rec;
        NTSTATUS status;
 
-       rec = db->fetch_locked(db, talloc_tos(), key);
+       rec = dbwrap_fetch_locked(db, talloc_tos(), key);
        if (rec == NULL) {
                return NT_STATUS_NO_MEMORY;
        }
-       status = rec->delete_rec(rec);
+       status = dbwrap_record_delete(rec);
        TALLOC_FREE(rec);
        return status;
 }
@@ -166,23 +277,51 @@ NTSTATUS dbwrap_traverse_read(struct db_context *db,
        return NT_STATUS_OK;
 }
 
-static int dbwrap_null_parser(TDB_DATA key, TDB_DATA val, void* data)
+static void dbwrap_null_parser(TDB_DATA key, TDB_DATA val, void* data)
 {
-       return 0;
+       return;
 }
 
-int dbwrap_parse_record(struct db_context *db, TDB_DATA key,
-                       int (*parser)(TDB_DATA key, TDB_DATA data,
-                                     void *private_data),
-                       void *private_data)
+NTSTATUS dbwrap_parse_record(struct db_context *db, TDB_DATA key,
+                            void (*parser)(TDB_DATA key, TDB_DATA data,
+                                           void *private_data),
+                            void *private_data)
 {
        if (parser == NULL) {
                parser = dbwrap_null_parser;
        }
+       return db->parse_record(db, key, parser, private_data);
+}
 
-       if (db->parse_record) {
-               return db->parse_record(db, key, parser, private_data);
-       } else {
-               return dbwrap_fallback_parse_record(db, key, parser, private_data);
+int dbwrap_wipe(struct db_context *db)
+{
+       if (db->wipe == NULL) {
+               return dbwrap_fallback_wipe(db);
        }
+       return db->wipe(db);
+}
+
+int dbwrap_get_seqnum(struct db_context *db)
+{
+       return db->get_seqnum(db);
+}
+
+int dbwrap_get_flags(struct db_context *db)
+{
+       return db->get_flags(db);
+}
+
+int dbwrap_transaction_start(struct db_context *db)
+{
+       return db->transaction_start(db);
+}
+
+int dbwrap_transaction_commit(struct db_context *db)
+{
+       return db->transaction_commit(db);
+}
+
+int dbwrap_transaction_cancel(struct db_context *db)
+{
+       return db->transaction_cancel(db);
 }