s3: Enforce a lock order in dbwrap
authorVolker Lendecke <vl@samba.org>
Sun, 8 Jan 2012 18:04:39 +0000 (19:04 +0100)
committerVolker Lendecke <vlendec@samba.org>
Wed, 18 Jan 2012 13:48:04 +0000 (14:48 +0100)
This makes sure we do not deadlock from doing two dbwrap_fetch_locked in two
processes in different orders. At open time, we assign a strict order to all
databases. lock_order 1 will be locked first, lock_order 2 second. No two
records of the same lock order may be locked at the same time.

source3/lib/dbwrap/dbwrap.c
source3/lib/dbwrap/dbwrap_open.c
source3/lib/dbwrap/dbwrap_private.h
source3/lib/dbwrap/dbwrap_rbt.c

index c551bfdecaf9ca20130b773f3bbe42bef3fbdbb8..e443fa5eb3481d6d6f737088261da3498f82335f 100644 (file)
@@ -75,11 +75,90 @@ NTSTATUS dbwrap_record_delete(struct db_record *rec)
        return rec->delete_rec(rec);
 }
 
+struct dbwrap_lock_order_state {
+       uint8_t *plock_order_mask;
+       uint8_t bitmask;
+};
+
+static int dbwrap_lock_order_state_destructor(
+       struct dbwrap_lock_order_state *s)
+{
+       *s->plock_order_mask &= ~s->bitmask;
+       return 0;
+}
+
+static struct dbwrap_lock_order_state *dbwrap_check_lock_order(
+       struct db_context *db, TALLOC_CTX *mem_ctx)
+{
+       /*
+        * Store the lock_order of currently locked records as bits in
+        * "lock_order_mask". We only use levels 1,2,3 right now, so a
+        * single uint8_t is enough.
+        */
+       static uint8_t lock_order_mask;
+
+       struct dbwrap_lock_order_state *state;
+       uint8_t idx;
+       int used;
+
+       if (db->lock_order == 0) {
+               /*
+                * lock order 0 is for example for dbwrap_rbt without
+                * real locking. Return state nevertheless to avoid
+                * special cases.
+                */
+               return talloc(mem_ctx, struct dbwrap_lock_order_state);
+       }
+
+       /*
+        * We fill bits from the high bits, to be able to use
+        * "ffs(lock_order_mask)"
+        */
+       idx = sizeof(lock_order_mask)*8 - db->lock_order;
+
+       used = ffs(lock_order_mask);
+
+       DEBUG(1, ("used=%d, lock_order=%d, idx=%d\n", used,
+                 (int)db->lock_order, (int)idx));
+
+       if ((used != 0) && (used-1 <= idx)) {
+               DEBUG(0, ("Lock order violation: Trying %d, order_mask=%x\n",
+                         (int)db->lock_order, (int)lock_order_mask));
+               return NULL;
+       }
+
+       state = talloc(mem_ctx, struct dbwrap_lock_order_state);
+       if (state == NULL) {
+               DEBUG(1, ("talloc failed\n"));
+               return NULL;
+       }
+       state->bitmask = 1 << idx;
+       state->plock_order_mask = &lock_order_mask;
+
+       talloc_set_destructor(state, dbwrap_lock_order_state_destructor);
+       lock_order_mask |= state->bitmask;
+
+       return state;
+}
+
 struct db_record *dbwrap_fetch_locked(struct db_context *db,
                                      TALLOC_CTX *mem_ctx,
                                      TDB_DATA key)
 {
-       return db->fetch_locked(db, mem_ctx, key);
+       struct db_record *rec;
+       struct dbwrap_lock_order_state *lock_order;
+
+       lock_order = dbwrap_check_lock_order(db, talloc_tos());
+       if (lock_order == NULL) {
+               return NULL;
+       }
+       rec = db->fetch_locked(db, mem_ctx, key);
+       if (rec == NULL) {
+               TALLOC_FREE(lock_order);
+               return NULL;
+       }
+       (void)talloc_steal(rec, lock_order);
+       return rec;
 }
 
 struct dbwrap_fetch_state {
index 23d299511b4e245e4c1a73acbb1a3e52a87a96dc..af24ed884741264fae1561b167490ca18aa37566 100644 (file)
@@ -124,5 +124,8 @@ struct db_context *db_open(TALLOC_CTX *mem_ctx,
                result = db_open_tdb(mem_ctx, name, hash_size,
                                     tdb_flags, open_flags, mode);
        }
+       if (result != NULL) {
+               result->lock_order = lock_order;
+       }
        return result;
 }
index d0b32793684296b179a90aca1db3a00e33bd12cd..111f02dc6b56bf2c616783249c82ece155c5e237 100644 (file)
@@ -56,6 +56,7 @@ struct db_context {
        int (*exists)(struct db_context *db,TDB_DATA key);
        int (*wipe)(struct db_context *db);
        void *private_data;
+       enum dbwrap_lock_order lock_order;
        bool persistent;
 };
 
index 2460418d1d24865b4975ff4394f3ce7b46bbda74..4fbb0bc1a022e04ba649f70ab073944dcaa5bc66 100644 (file)
@@ -435,6 +435,7 @@ struct db_context *db_open_rbt(TALLOC_CTX *mem_ctx)
        result->exists = db_rbt_exists;
        result->wipe = db_rbt_wipe;
        result->parse_record = db_rbt_parse_record;
+       result->lock_order = 0;
 
        return result;
 }