r10421: following on discussions with simo, I have worked out a way of
authorAndrew Tridgell <tridge@samba.org>
Thu, 22 Sep 2005 13:12:46 +0000 (13:12 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 18:38:44 +0000 (13:38 -0500)
allowing searches to proceed while another process is in a
transaction, then only upgrading the transaction lock to a write lock
on commit.

The solution is:

 - split tdb_traverse() into two calls, called tdb_traverse() and
   tdb_traverse_read(). The _read() version only gets read locks, and
   will fail any write operations made in the callback from the
   traverse.

 - the normal tdb_traverse() call allows for read or write operations
   in the callback, but gets the transaction lock, preventing
   transastions from starting inside the traverse

In addition we enforce the following rule that you may not start a
transaction within a traverse callback, although you can start a
traverse within a transaction

With these rules in place I believe all the deadlock possibilities are
removed, and we can now allow for searches to happen in parallel with
transactions
(This used to be commit 7dd31288a701d772e45b1960ac4ce4cc1be782ed)

source4/lib/tdb/common/error.c
source4/lib/tdb/common/io.c
source4/lib/tdb/common/lock.c
source4/lib/tdb/common/tdb.c
source4/lib/tdb/common/tdb_private.h
source4/lib/tdb/common/transaction.c
source4/lib/tdb/common/traverse.c
source4/lib/tdb/docs/README
source4/lib/tdb/include/tdb.h
source4/lib/tdb/tools/tdbtorture.c

index 270a441463396840b06d176a17fffb5e0e9f37b3..4cf33a29ab2e21abc89c19970202d40c9b13cbe0 100644 (file)
@@ -43,7 +43,8 @@ static struct tdb_errname {
             {TDB_ERR_EXISTS, "Record exists"},
             {TDB_ERR_NOLOCK, "Lock exists on other keys"},
             {TDB_ERR_EINVAL, "Invalid parameter"},
-            {TDB_ERR_NOEXIST, "Record does not exist"} };
+            {TDB_ERR_NOEXIST, "Record does not exist"},
+            {TDB_ERR_RDONLY, "write not permitted"} };
 
 /* Error string for the last tdb error */
 const char *tdb_errorstr(struct tdb_context *tdb)
index f02cd1a0e1a96f62665734113681d036b7b6ac18..c1f6f55bc48a086aacd1a1bb930090a16e6ffa4b 100644 (file)
@@ -97,6 +97,11 @@ static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
 static int tdb_write(struct tdb_context *tdb, tdb_off_t off, 
                     const void *buf, tdb_len_t len)
 {
+       if (tdb->read_only) {
+               tdb->ecode = TDB_ERR_RDONLY;
+               return -1;
+       }
+
        if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
                return -1;
 
@@ -224,6 +229,12 @@ void tdb_mmap(struct tdb_context *tdb)
 static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
 {
        char buf[1024];
+
+       if (tdb->read_only) {
+               tdb->ecode = TDB_ERR_RDONLY;
+               return -1;
+       }
+
        if (ftruncate(tdb->fd, size+addition) == -1) {
                char b = 0;
                if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
index 8061bab7b11dbd2842ba96875739c6103ee9f61c..7a76bd3d9b3a3f034fe07a6a3dcad08e59404d96 100644 (file)
@@ -42,10 +42,12 @@ int tdb_brlock_len(struct tdb_context *tdb, tdb_off_t offset,
        struct flock fl;
        int ret;
 
-       if (tdb->flags & TDB_NOLOCK)
+       if (tdb->flags & TDB_NOLOCK) {
                return 0;
+       }
+
        if ((rw_type == F_WRLCK) && (tdb->read_only)) {
-               errno = EACCES;
+               tdb->ecode = TDB_ERR_RDONLY;
                return -1;
        }
 
@@ -64,8 +66,8 @@ int tdb_brlock_len(struct tdb_context *tdb, tdb_off_t offset,
                 * EAGAIN is an expected return from non-blocking
                 * locks. */
                if (errno != EAGAIN) {
-                       TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
-                                tdb->fd, offset, rw_type, lck_type, 
+                       TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d: %s\n", 
+                                tdb->fd, offset, rw_type, lck_type, len,
                                 strerror(errno)));
                } else if (!probe && lck_type != F_SETLK) {
                        /* Ensure error code is set for log fun to examine. */
index c37d37a4f2f644ea5a9402441eb5f922fba98b2b..2e229e88ccb1d56da6cbc179c9dabeb8edf15630 100644 (file)
@@ -227,6 +227,11 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        char *p = NULL;
        int ret = 0;
 
+       if (tdb->read_only) {
+               tdb->ecode = TDB_ERR_RDONLY;
+               return -1;
+       }
+
        /* find which hash bucket it is in */
        hash = tdb->hash_fn(&key);
        if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
index eefcc52557a17328eafe767a654a8e819cfd426a..ba0a379e7fcb4c796f54c2e9b619daecef41e242 100644 (file)
@@ -162,6 +162,7 @@ struct tdb_traverse_lock {
        struct tdb_traverse_lock *next;
        u32 off;
        u32 hash;
+       int lock_rw;
 };
 
 
index b9d44a72831e6fdd9a4b33000baaf8e9083fab08..75e91c56ccbe7df058a048d72ddbb1cfe5bb6cdf 100644 (file)
@@ -372,6 +372,15 @@ int tdb_transaction_start(struct tdb_context *tdb)
                return -1;
        }
 
+       if (tdb->travlocks.next != NULL) {
+               /* you cannot use transactions inside a traverse (although you can use
+                  traverse inside a transaction) as otherwise you can end up with
+                  deadlock */
+               TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
+               tdb->ecode = TDB_ERR_LOCK;
+               return -1;
+       }
+
        tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
        if (tdb->transaction == NULL) {
                tdb->ecode = TDB_ERR_OOM;
@@ -388,15 +397,9 @@ int tdb_transaction_start(struct tdb_context *tdb)
                return -1;
        }
        
-       /* get a write lock from the freelist to the end of file. It
-          would be much better to make this a read lock as it would
-          increase parallelism, but it could lead to deadlocks on
-          commit when a write lock needs to be taken. 
-
-          TODO: look at alternative locking strategies to allow this
-          to be a read lock 
-       */
-       if (tdb_brlock_len(tdb, FREELIST_TOP, F_WRLCK, F_SETLKW, 0, 0) == -1) {
+       /* get a read lock from the freelist to the end of file. This
+          is upgraded to a write lock during the commit */
+       if (tdb_brlock_len(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
                TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get hash locks\n"));
                tdb->ecode = TDB_ERR_LOCK;
                goto fail;
@@ -763,6 +766,14 @@ int tdb_transaction_commit(struct tdb_context *tdb)
                return -1;
        }
 
+       /* upgrade the main transaction lock region to a write lock */
+       if (tdb_brlock_len(tdb, FREELIST_TOP, F_WRLCK, F_SETLKW, 0, 0) == -1) {
+               TDB_LOG((tdb, 0, "tdb_transaction_start: failed to upgrade hash locks\n"));
+               tdb->ecode = TDB_ERR_LOCK;
+               tdb_transaction_cancel(tdb);
+               return -1;
+       }
+
        /* get the global lock - this prevents new users attaching to the database
           during the commit */
        if (tdb_brlock_len(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
index 7d1e99cbe8f832a79abedb39ef0edd90b1b4cbce..335dce41527ffba91eb8a6f0fc43828cac6b36eb 100644 (file)
@@ -71,7 +71,7 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc
                        }
                }
 
-               if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
+               if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
                        return -1;
 
                /* No previous record?  Start at top of chain. */
@@ -118,7 +118,7 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc
                            tdb_do_delete(tdb, current, rec) != 0)
                                goto fail;
                }
-               tdb_unlock(tdb, tlock->hash, F_WRLCK);
+               tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
                want_next = 0;
        }
        /* We finished iteration without finding anything */
@@ -126,7 +126,7 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc
 
  fail:
        tlock->off = 0;
-       if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
+       if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
                TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
        return -1;
 }
@@ -136,32 +136,33 @@ static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tloc
    if fn is NULL then it is not called
    a non-zero return value from fn() indicates that the traversal should stop
   */
-int tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *private)
+static int tdb_traverse_internal(struct tdb_context *tdb, 
+                                tdb_traverse_func fn, void *private,
+                                struct tdb_traverse_lock *tl)
 {
        TDB_DATA key, dbuf;
        struct list_struct rec;
-       struct tdb_traverse_lock tl = { NULL, 0, 0 };
        int ret, count = 0;
 
        /* This was in the initializaton, above, but the IRIX compiler
         * did not like it.  crh
         */
-       tl.next = tdb->travlocks.next;
+       tl->next = tdb->travlocks.next;
 
        /* fcntl locks don't stack: beware traverse inside traverse */
-       tdb->travlocks.next = &tl;
+       tdb->travlocks.next = tl;
 
        /* tdb_next_lock places locks on the record returned, and its chain */
-       while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
+       while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
                count++;
                /* now read the full record */
-               key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
+               key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec), 
                                          rec.key_len + rec.data_len);
                if (!key.dptr) {
                        ret = -1;
-                       if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
+                       if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
                                goto out;
-                       if (tdb_unlock_record(tdb, tl.off) != 0)
+                       if (tdb_unlock_record(tdb, tl->off) != 0)
                                TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
                        goto out;
                }
@@ -170,31 +171,70 @@ int tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *private)
                dbuf.dsize = rec.data_len;
 
                /* Drop chain lock, call out */
-               if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
+               if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
                        ret = -1;
                        goto out;
                }
                if (fn && fn(tdb, key, dbuf, private)) {
                        /* They want us to terminate traversal */
                        ret = count;
-                       if (tdb_unlock_record(tdb, tl.off) != 0) {
+                       if (tdb_unlock_record(tdb, tl->off) != 0) {
                                TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
                                ret = -1;
                        }
-                       tdb->travlocks.next = tl.next;
+                       tdb->travlocks.next = tl->next;
                        SAFE_FREE(key.dptr);
                        return count;
                }
                SAFE_FREE(key.dptr);
        }
 out:
-       tdb->travlocks.next = tl.next;
+       tdb->travlocks.next = tl->next;
        if (ret < 0)
                return -1;
        else
                return count;
 }
 
+
+/*
+  a write style traverse - temporarily marks the db read only
+*/
+int tdb_traverse_read(struct tdb_context *tdb, 
+                     tdb_traverse_func fn, void *private)
+{
+       struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
+       int ret, read_only = tdb->read_only;
+       tdb->read_only = 1;
+       ret = tdb_traverse_internal(tdb, fn, private, &tl);
+       tdb->read_only = read_only;
+       return ret;
+}
+
+/*
+  a write style traverse - needs to get the transaction lock to
+  prevent deadlocks
+*/
+int tdb_traverse(struct tdb_context *tdb, 
+                tdb_traverse_func fn, void *private)
+{
+       struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
+       int ret;
+       
+       if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
+               TDB_LOG((tdb, 0, "tdb_traverse: failed to get transaction lock\n"));
+               tdb->ecode = TDB_ERR_LOCK;
+               return -1;
+       }
+
+       ret = tdb_traverse_internal(tdb, fn, private, &tl);
+
+       tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0);
+
+       return ret;
+}
+
+
 /* find the first entry in the database and return its key */
 TDB_DATA tdb_firstkey(struct tdb_context *tdb)
 {
index 18b32de37f65d804d4c6b1d6c15cf2ebcf7dc4a8..b31ce36ab149b7416043990de4f2eda528e66637 100644 (file)
@@ -127,7 +127,25 @@ int tdb_traverse(TDB_CONTEXT *tdb, int (*fn)(TDB_CONTEXT *tdb,
 
    if fn is NULL then it is not called
 
-   a non-zero return value from fn() indicates that the traversal should stop
+   a non-zero return value from fn() indicates that the traversal
+   should stop. Traversal callbacks may not start transactions.
+
+----------------------------------------------------------------------
+int tdb_traverse_read(TDB_CONTEXT *tdb, int (*fn)(TDB_CONTEXT *tdb,
+                     TDB_DATA key, TDB_DATA dbuf, void *state), void *state);
+
+   traverse the entire database - calling fn(tdb, key, data, state) on
+   each element, but marking the database read only during the
+   traversal, so any write operations will fail. This allows tdb to
+   use read locks, which increases the parallelism possible during the
+   traversal.
+
+   return -1 on error or the record count traversed
+
+   if fn is NULL then it is not called
+
+   a non-zero return value from fn() indicates that the traversal
+   should stop. Traversal callbacks may not start transactions.
 
 ----------------------------------------------------------------------
 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb);
index 3f123d814cfd7a79ae29d7f593ea89df9b2a4750..c116f29fc18849490c0b9cfea96e6756ce39f9dd 100644 (file)
@@ -52,7 +52,7 @@ extern "C" {
 /* error codes */
 enum TDB_ERROR {TDB_SUCCESS=0, TDB_ERR_CORRUPT, TDB_ERR_IO, TDB_ERR_LOCK, 
                TDB_ERR_OOM, TDB_ERR_EXISTS, TDB_ERR_NOLOCK, TDB_ERR_LOCK_TIMEOUT,
-               TDB_ERR_NOEXIST, TDB_ERR_EINVAL};
+               TDB_ERR_NOEXIST, TDB_ERR_EINVAL, TDB_ERR_RDONLY};
 
 typedef struct TDB_DATA {
        unsigned char *dptr;
@@ -98,6 +98,7 @@ int tdb_close(struct tdb_context *tdb);
 TDB_DATA tdb_firstkey(struct tdb_context *tdb);
 TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA key);
 int tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *);
+int tdb_traverse_read(struct tdb_context *tdb, tdb_traverse_func fn, void *);
 int tdb_exists(struct tdb_context *tdb, TDB_DATA key);
 int tdb_lockall(struct tdb_context *tdb);
 void tdb_unlockall(struct tdb_context *tdb);
index b0a2e7484f73d10d84fe3595976ee0c1589b3169..c0076a92d4602367f00526bba6b2152503cafc9e 100644 (file)
@@ -39,6 +39,7 @@
 #define TRANSACTION_PROB 10
 #define LOCKSTORE_PROB 5
 #define TRAVERSE_PROB 20
+#define TRAVERSE_READ_PROB 20
 #define CULL_PROB 100
 #define KEYLEN 3
 #define DATALEN 100
@@ -192,6 +193,13 @@ static void addrec_db(void)
        }
 #endif
 
+#if TRAVERSE_READ_PROB
+       if (random() % TRAVERSE_READ_PROB == 0) {
+               tdb_traverse_read(db, NULL, NULL);
+               goto next;
+       }
+#endif
+
        data = tdb_fetch(db, key);
        if (data.dptr) free(data.dptr);
 
@@ -273,7 +281,7 @@ static void usage(void)
                addrec_db();
        }
 
-       tdb_traverse(db, NULL, NULL);
+       tdb_traverse_read(db, NULL, NULL);
        tdb_traverse(db, traverse_fn, NULL);
        tdb_traverse(db, traverse_fn, NULL);