tdb: cleanup: split brlock and brunlock methods.
authorRusty Russell <rusty@rustcorp.com.au>
Wed, 17 Feb 2010 01:47:19 +0000 (12:17 +1030)
committerRusty Russell <rusty@rustcorp.com.au>
Wed, 17 Feb 2010 01:47:19 +0000 (12:17 +1030)
This is taken from the CCAN code base: rather than using tdb_brlock for
locking and unlocking, we split it into brlock and brunlock functions.

For extra debugging information, brunlock says what kind of lock it is
unlocking (even though fnctl locks don't need this).  This requires an
extra argument to tdb_transaction_unlock() so we know whether the
lock was upgraded to a write lock or not.

We also use a "flags" argument tdb_brlock:
1) TDB_LOCK_NOWAIT replaces lck_type = F_SETLK (vs F_SETLKW).
2) TDB_LOCK_MARK_ONLY replaces setting TDB_MARK_LOCK bit in ltype.
3) TDB_LOCK_PROBE replaces the "probe" argument.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
lib/tdb/common/io.c
lib/tdb/common/lock.c
lib/tdb/common/open.c
lib/tdb/common/tdb.c
lib/tdb/common/tdb_private.h
lib/tdb/common/transaction.c
lib/tdb/common/traverse.c

index d549715f831b414fc098b334a97f6572c74ffc76..85f69ab0d6ec7476b9e12f181f80715177b20974 100644 (file)
@@ -461,7 +461,8 @@ static const struct tdb_methods io_methods = {
        tdb_next_hash_chain,
        tdb_oob,
        tdb_expand_file,
-       tdb_brlock
+       tdb_brlock,
+       tdb_brunlock
 };
 
 /*
index 0984e516ea14cd88530fa29d75fe05fbbf4bde39..515bffee598f01df50f73524d214b21b72f79510 100644 (file)
 
 #include "tdb_private.h"
 
-#define TDB_MARK_LOCK 0x80000000
-
 void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
 {
        tdb->interrupt_sig_ptr = ptr;
 }
 
+static int fcntl_lock(struct tdb_context *tdb,
+                     int rw, off_t off, off_t len, bool waitflag)
+{
+       struct flock fl;
+
+       fl.l_type = rw;
+       fl.l_whence = SEEK_SET;
+       fl.l_start = off;
+       fl.l_len = len;
+       fl.l_pid = 0;
+
+       if (waitflag)
+               return fcntl(tdb->fd, F_SETLKW, &fl);
+       else
+               return fcntl(tdb->fd, F_SETLK, &fl);
+}
+
+static int fcntl_unlock(struct tdb_context *tdb, int rw, off_t off, off_t len)
+{
+       struct flock fl;
+#if 0 /* Check they matched up locks and unlocks correctly. */
+       char line[80];
+       FILE *locks;
+       bool found = false;
+
+       locks = fopen("/proc/locks", "r");
+
+       while (fgets(line, 80, locks)) {
+               char *p;
+               int type, start, l;
+
+               /* eg. 1: FLOCK  ADVISORY  WRITE 2440 08:01:2180826 0 EOF */
+               p = strchr(line, ':') + 1;
+               if (strncmp(p, " POSIX  ADVISORY  ", strlen(" POSIX  ADVISORY  ")))
+                       continue;
+               p += strlen(" FLOCK  ADVISORY  ");
+               if (strncmp(p, "READ  ", strlen("READ  ")) == 0)
+                       type = F_RDLCK;
+               else if (strncmp(p, "WRITE ", strlen("WRITE ")) == 0)
+                       type = F_WRLCK;
+               else
+                       abort();
+               p += 6;
+               if (atoi(p) != getpid())
+                       continue;
+               p = strchr(strchr(p, ' ') + 1, ' ') + 1;
+               start = atoi(p);
+               p = strchr(p, ' ') + 1;
+               if (strncmp(p, "EOF", 3) == 0)
+                       l = 0;
+               else
+                       l = atoi(p) - start + 1;
+
+               if (off == start) {
+                       if (len != l) {
+                               fprintf(stderr, "Len %u should be %u: %s",
+                                       (int)len, l, line);
+                               abort();
+                       }
+                       if (type != rw) {
+                               fprintf(stderr, "Type %s wrong: %s",
+                                       rw == F_RDLCK ? "READ" : "WRITE", line);
+                               abort();
+                       }
+                       found = true;
+                       break;
+               }
+       }
+
+       if (!found) {
+               fprintf(stderr, "Unlock on %u@%u not found!\n",
+                       (int)off, (int)len);
+               abort();
+       }
+
+       fclose(locks);
+#endif
+
+       fl.l_type = F_UNLCK;
+       fl.l_whence = SEEK_SET;
+       fl.l_start = off;
+       fl.l_len = len;
+       fl.l_pid = 0;
+
+       return fcntl(tdb->fd, F_SETLKW, &fl);
+}
+
 /* a byte range locking function - return 0 on success
    this functions locks/unlocks 1 byte at the specified offset.
 
@@ -42,30 +127,28 @@ void tdb_setalarm_sigptr(struct tdb_context *tdb, volatile sig_atomic_t *ptr)
 
    note that a len of zero means lock to end of file
 */
-int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, 
-              int rw_type, int lck_type, int probe, size_t len)
+int tdb_brlock(struct tdb_context *tdb,
+              int rw_type, tdb_off_t offset, size_t len,
+              enum tdb_lock_flags flags)
 {
-       struct flock fl;
        int ret;
 
        if (tdb->flags & TDB_NOLOCK) {
                return 0;
        }
 
+       if (flags & TDB_LOCK_MARK_ONLY) {
+               return 0;
+       }
+
        if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
                tdb->ecode = TDB_ERR_RDONLY;
                return -1;
        }
 
-       fl.l_type = rw_type;
-       fl.l_whence = SEEK_SET;
-       fl.l_start = offset;
-       fl.l_len = len;
-       fl.l_pid = 0;
-
        do {
-               ret = fcntl(tdb->fd,lck_type,&fl);
-
+               ret = fcntl_lock(tdb, rw_type, offset, len,
+                                flags & TDB_LOCK_WAIT);
                /* Check for a sigalarm break. */
                if (ret == -1 && errno == EINTR &&
                                tdb->interrupt_sig_ptr &&
@@ -79,15 +162,34 @@ int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
                /* Generic lock error. errno set by fcntl.
                 * EAGAIN is an expected return from non-blocking
                 * locks. */
-               if (!probe && lck_type != F_SETLK) {
-                       TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n", 
-                                tdb->fd, offset, rw_type, lck_type, (int)len));
+               if (!(flags & TDB_LOCK_PROBE) && errno != EAGAIN) {
+                       TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d flags=%d len=%d\n",
+                                tdb->fd, offset, rw_type, flags, (int)len));
                }
                return -1;
        }
        return 0;
 }
 
+int tdb_brunlock(struct tdb_context *tdb,
+                int rw_type, tdb_off_t offset, size_t len)
+{
+       int ret;
+
+       if (tdb->flags & TDB_NOLOCK) {
+               return 0;
+       }
+
+       do {
+               ret = fcntl_unlock(tdb, rw_type, offset, len);
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brunlock failed (fd=%d) at offset %d rw_type=%d len=%d\n",
+                        tdb->fd, offset, rw_type, (int)len));
+       }
+       return ret;
+}
 
 /*
   upgrade a read lock to a write lock. This needs to be handled in a
@@ -100,7 +202,8 @@ int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
        int count = 1000;
        while (count--) {
                struct timeval tv;
-               if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
+               if (tdb_brlock(tdb, F_WRLCK, offset, len,
+                              TDB_LOCK_WAIT|TDB_LOCK_PROBE) == 0) {
                        return 0;
                }
                if (errno != EDEADLK) {
@@ -117,13 +220,11 @@ int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
 
 
 /* lock a list in the database. list -1 is the alloc list */
-static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
+static int _tdb_lock(struct tdb_context *tdb, int list, int ltype,
+                    enum tdb_lock_flags flags)
 {
        struct tdb_lock_type *new_lck;
        int i;
-       bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
-
-       ltype &= ~TDB_MARK_LOCK;
 
        /* a global lock allows us to avoid per chain locks */
        if (tdb->global_lock.count && 
@@ -175,9 +276,7 @@ static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
 
        /* Since fcntl locks don't nest, we do a lock for the first one,
           and simply bump the count for future ones */
-       if (!mark_lock &&
-           tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list, ltype, op,
-                                    0, 1)) {
+       if (tdb->methods->brlock(tdb, ltype, FREELIST_TOP+4*list, 1, flags)) {
                return -1;
        }
 
@@ -195,7 +294,8 @@ static int _tdb_lock(struct tdb_context *tdb, int list, int ltype, int op)
 int tdb_lock(struct tdb_context *tdb, int list, int ltype)
 {
        int ret;
-       ret = _tdb_lock(tdb, list, ltype, F_SETLKW);
+
+       ret = _tdb_lock(tdb, list, ltype, TDB_LOCK_WAIT);
        if (ret) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
                         "ltype=%d (%s)\n",  list, ltype, strerror(errno)));
@@ -206,21 +306,16 @@ int tdb_lock(struct tdb_context *tdb, int list, int ltype)
 /* lock a list in the database. list -1 is the alloc list. non-blocking lock */
 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype)
 {
-       return _tdb_lock(tdb, list, ltype, F_SETLK);
+       return _tdb_lock(tdb, list, ltype, TDB_LOCK_NOWAIT);
 }
 
 
-/* unlock the database: returns void because it's too late for errors. */
-       /* changed to return int it may be interesting to know there
-          has been an error  --simo */
-int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
+static int _tdb_unlock(struct tdb_context *tdb, int list, int ltype,
+                      bool mark_lock)
 {
        int ret = -1;
        int i;
        struct tdb_lock_type *lck = NULL;
-       bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
-
-       ltype &= ~TDB_MARK_LOCK;
 
        /* a global lock allows us to avoid per chain locks */
        if (tdb->global_lock.count && 
@@ -269,8 +364,8 @@ int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
        if (mark_lock) {
                ret = 0;
        } else {
-               ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
-                                              F_SETLKW, 0, 1);
+               ret = tdb->methods->brunlock(tdb, ltype,
+                                            FREELIST_TOP+4*list, 1);
        }
        tdb->num_locks--;
 
@@ -298,6 +393,11 @@ int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
        return ret;
 }
 
+int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
+{
+       return _tdb_unlock(tdb, list, ltype, false);
+}
+
 /*
   get the transaction lock
  */
@@ -311,8 +411,7 @@ int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
                return 0;
        }
 
-       if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, ltype, 
-                                    F_SETLKW, 0, 1) == -1) {
+       if (tdb->methods->brlock(tdb, ltype, TRANSACTION_LOCK, 1, TDB_LOCK_WAIT) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_lock: failed to get transaction lock\n"));
                tdb->ecode = TDB_ERR_LOCK;
                return -1;
@@ -324,7 +423,7 @@ int tdb_transaction_lock(struct tdb_context *tdb, int ltype)
 /*
   release the transaction lock
  */
-int tdb_transaction_unlock(struct tdb_context *tdb)
+int tdb_transaction_unlock(struct tdb_context *tdb, int ltype)
 {
        int ret;
        if (tdb->global_lock.count) {
@@ -334,7 +433,7 @@ int tdb_transaction_unlock(struct tdb_context *tdb)
                tdb->transaction_lock_count--;
                return 0;
        }
-       ret = tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+       ret = tdb->methods->brunlock(tdb, ltype, TRANSACTION_LOCK, 1);
        if (ret == 0) {
                tdb->transaction_lock_count = 0;
        }
@@ -345,12 +444,9 @@ int tdb_transaction_unlock(struct tdb_context *tdb)
 
 
 /* lock/unlock entire database */
-static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
+static int _tdb_lockall(struct tdb_context *tdb, int ltype,
+                       enum tdb_lock_flags flags)
 {
-       bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
-
-       ltype &= ~TDB_MARK_LOCK;
-
        /* There are no locks on read-only dbs */
        if (tdb->read_only || tdb->traverse_read) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -374,10 +470,10 @@ static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
                return -1;
        }
 
-       if (!mark_lock &&
-           tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
-                                    0, 4*tdb->header.hash_size)) {
-               if (op == F_SETLKW) {
+       if (tdb->methods->brlock(tdb, ltype,
+                                FREELIST_TOP, 4*tdb->header.hash_size,
+                                flags)) {
+               if (flags & TDB_LOCK_WAIT) {
                        TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
                }
                return -1;
@@ -392,12 +488,8 @@ static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
 
 
 /* unlock entire db */
-static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
+static int _tdb_unlockall(struct tdb_context *tdb, int ltype, bool mark_lock)
 {
-       bool mark_lock = ((ltype & TDB_MARK_LOCK) == TDB_MARK_LOCK);
-
-       ltype &= ~TDB_MARK_LOCK;
-
        /* There are no locks on read-only dbs */
        if (tdb->read_only || tdb->traverse_read) {
                tdb->ecode = TDB_ERR_LOCK;
@@ -415,8 +507,8 @@ static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
        }
 
        if (!mark_lock &&
-           tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 
-                                    0, 4*tdb->header.hash_size)) {
+           tdb->methods->brunlock(tdb, ltype,
+                                  FREELIST_TOP, 4*tdb->header.hash_size)) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
                return -1;
        }
@@ -431,27 +523,27 @@ static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
 int tdb_lockall(struct tdb_context *tdb)
 {
        tdb_trace(tdb, "tdb_lockall");
-       return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
+       return _tdb_lockall(tdb, F_WRLCK, TDB_LOCK_WAIT);
 }
 
 /* lock entire database with write lock - mark only */
 int tdb_lockall_mark(struct tdb_context *tdb)
 {
        tdb_trace(tdb, "tdb_lockall_mark");
-       return _tdb_lockall(tdb, F_WRLCK | TDB_MARK_LOCK, F_SETLKW);
+       return _tdb_lockall(tdb, F_WRLCK, TDB_LOCK_MARK_ONLY);
 }
 
 /* unlock entire database with write lock - unmark only */
 int tdb_lockall_unmark(struct tdb_context *tdb)
 {
        tdb_trace(tdb, "tdb_lockall_unmark");
-       return _tdb_unlockall(tdb, F_WRLCK | TDB_MARK_LOCK);
+       return _tdb_unlockall(tdb, F_WRLCK, true);
 }
 
 /* lock entire database with write lock - nonblocking varient */
 int tdb_lockall_nonblock(struct tdb_context *tdb)
 {
-       int ret = _tdb_lockall(tdb, F_WRLCK, F_SETLK);
+       int ret = _tdb_lockall(tdb, F_WRLCK, TDB_LOCK_NOWAIT);
        tdb_trace_ret(tdb, "tdb_lockall_nonblock", ret);
        return ret;
 }
@@ -460,20 +552,20 @@ int tdb_lockall_nonblock(struct tdb_context *tdb)
 int tdb_unlockall(struct tdb_context *tdb)
 {
        tdb_trace(tdb, "tdb_unlockall");
-       return _tdb_unlockall(tdb, F_WRLCK);
+       return _tdb_unlockall(tdb, F_WRLCK, false);
 }
 
 /* lock entire database with read lock */
 int tdb_lockall_read(struct tdb_context *tdb)
 {
        tdb_trace(tdb, "tdb_lockall_read");
-       return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
+       return _tdb_lockall(tdb, F_RDLCK, TDB_LOCK_WAIT);
 }
 
 /* lock entire database with read lock - nonblock varient */
 int tdb_lockall_read_nonblock(struct tdb_context *tdb)
 {
-       int ret = _tdb_lockall(tdb, F_RDLCK, F_SETLK);
+       int ret = _tdb_lockall(tdb, F_RDLCK, TDB_LOCK_NOWAIT);
        tdb_trace_ret(tdb, "tdb_lockall_read_nonblock", ret);
        return ret;
 }
@@ -482,7 +574,7 @@ int tdb_lockall_read_nonblock(struct tdb_context *tdb)
 int tdb_unlockall_read(struct tdb_context *tdb)
 {
        tdb_trace(tdb, "tdb_unlockall_read");
-       return _tdb_unlockall(tdb, F_RDLCK);
+       return _tdb_unlockall(tdb, F_RDLCK, false);
 }
 
 /* lock/unlock one hash chain. This is meant to be used to reduce
@@ -507,7 +599,8 @@ int tdb_chainlock_nonblock(struct tdb_context *tdb, TDB_DATA key)
 /* mark a chain as locked without actually locking it. Warning! use with great caution! */
 int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
 {
-       int ret = tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
+       int ret = _tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK,
+                           TDB_LOCK_MARK_ONLY);
        tdb_trace_1rec(tdb, "tdb_chainlock_mark", key);
        return ret;
 }
@@ -516,7 +609,7 @@ int tdb_chainlock_mark(struct tdb_context *tdb, TDB_DATA key)
 int tdb_chainlock_unmark(struct tdb_context *tdb, TDB_DATA key)
 {
        tdb_trace_1rec(tdb, "tdb_chainlock_unmark", key);
-       return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK | TDB_MARK_LOCK);
+       return _tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK, true);
 }
 
 int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
@@ -547,7 +640,7 @@ int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
        if (tdb->global_lock.count) {
                return 0;
        }
-       return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
+       return off ? tdb->methods->brlock(tdb, F_RDLCK, off, 1, TDB_LOCK_WAIT) : 0;
 }
 
 /*
@@ -561,16 +654,12 @@ int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
        for (i = &tdb->travlocks; i; i = i->next)
                if (i->off == off)
                        return -1;
-       return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
+       return tdb->methods->brlock(tdb, F_WRLCK, off, 1, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
 }
 
-/*
-  Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
-  an error to fail to get the lock here.
-*/
 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
 {
-       return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
+       return tdb->methods->brunlock(tdb, F_WRLCK, off, 1);
 }
 
 /* fcntl locks don't stack: avoid unlocking someone else's */
@@ -588,5 +677,5 @@ int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
        for (i = &tdb->travlocks; i; i = i->next)
                if (i->off == off)
                        count++;
-       return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
+       return (count == 1 ? tdb->methods->brunlock(tdb, F_RDLCK, off, 1) : 0);
 }
index 4d4f95a3daa7e7c57a1349467c7503a077ba8df4..13d5c94e377216092e4bc05682b5bb6ea9f9eb9b 100644 (file)
@@ -241,7 +241,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
         fcntl(tdb->fd, F_SETFD, v | FD_CLOEXEC);
 
        /* ensure there is only one process initialising at once */
-       if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
+       if (tdb->methods->brlock(tdb, F_WRLCK, GLOBAL_LOCK, 1, TDB_LOCK_WAIT) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
                         name, strerror(errno)));
                goto fail;      /* errno set by tdb_brlock */
@@ -250,7 +250,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        /* we need to zero database if we are the only one with it open */
        if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
            (!tdb->read_only) &&
-           (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
+           (locked = (tdb->methods->brlock(tdb, F_WRLCK, ACTIVE_LOCK, 1, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE) == 0))) {
                open_flags |= O_CREAT;
                if (ftruncate(tdb->fd, 0) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
@@ -313,9 +313,9 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        tdb->inode = st.st_ino;
        tdb_mmap(tdb);
        if (locked) {
-               if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
+               if (tdb->methods->brunlock(tdb, F_WRLCK, ACTIVE_LOCK, 1) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
-                                "failed to take ACTIVE_LOCK on %s: %s\n",
+                                "failed to release ACTIVE_LOCK on %s: %s\n",
                                 name, strerror(errno)));
                        goto fail;
                }
@@ -328,7 +328,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
 
        if (tdb_flags & TDB_CLEAR_IF_FIRST) {
                /* leave this lock in place to indicate it's in use */
-               if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
+               if (tdb->methods->brlock(tdb, F_RDLCK, ACTIVE_LOCK, 1, TDB_LOCK_WAIT) == -1)
                        goto fail;
        }
 
@@ -357,7 +357,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        /* Internal (memory-only) databases skip all the code above to
         * do with disk files, and resume here by releasing their
         * global lock and hooking into the active list. */
-       if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
+       if (tdb->methods->brunlock(tdb, F_WRLCK, GLOBAL_LOCK, 1) == -1)
                goto fail;
        tdb->next = tdbs;
        tdbs = tdb;
@@ -407,10 +407,10 @@ int tdb_close(struct tdb_context *tdb)
        struct tdb_context **i;
        int ret = 0;
 
-       tdb_trace(tdb, "tdb_close");
        if (tdb->transaction) {
-               _tdb_transaction_cancel(tdb);
+               tdb_transaction_cancel(tdb);
        }
+       tdb_trace(tdb, "tdb_close");
 
        if (tdb->map_ptr) {
                if (tdb->flags & TDB_INTERNAL)
@@ -501,7 +501,7 @@ static int tdb_reopen_internal(struct tdb_context *tdb, bool active_lock)
 #endif /* fake pread or pwrite */
 
        if (active_lock &&
-           (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
+           (tdb->methods->brlock(tdb, F_RDLCK, ACTIVE_LOCK, 1, TDB_LOCK_WAIT) == -1)) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
                goto fail;
        }
index f6ae3fd14bb0e58c3657b1a770f3fde17b9736a9..9fa7287113d1efe6fb40f59dc57a230c06a07be3 100644 (file)
@@ -59,13 +59,14 @@ static void tdb_increment_seqnum(struct tdb_context *tdb)
                return;
        }
 
-       if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
+       if (tdb_brlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1,
+                      TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
                return;
        }
 
        tdb_increment_seqnum_nonblock(tdb);
 
-       tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
+       tdb_brunlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1);
 }
 
 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
index 0e74b103b31c729baef74ed5898c520effaaae1f..26d2cf55faf97115da4a9285fea308264f232017 100644 (file)
@@ -163,6 +163,15 @@ struct tdb_traverse_lock {
        int lock_rw;
 };
 
+enum tdb_lock_flags {
+       /* WAIT == F_SETLKW, NOWAIT == F_SETLK */
+       TDB_LOCK_NOWAIT = 0,
+       TDB_LOCK_WAIT = 1,
+       /* If set, don't log an error on failure. */
+       TDB_LOCK_PROBE = 2,
+       /* If set, don't actually lock at all. */
+       TDB_LOCK_MARK_ONLY = 4,
+};
 
 struct tdb_methods {
        int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
@@ -170,7 +179,8 @@ struct tdb_methods {
        void (*next_hash_chain)(struct tdb_context *, uint32_t *);
        int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
        int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
-       int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
+       int (*brlock)(struct tdb_context *, int, tdb_off_t, size_t, enum tdb_lock_flags);
+       int (*brunlock)(struct tdb_context *, int, tdb_off_t, size_t);
 };
 
 struct tdb_context {
@@ -215,9 +225,13 @@ void tdb_mmap(struct tdb_context *tdb);
 int tdb_lock(struct tdb_context *tdb, int list, int ltype);
 int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
 int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
-int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
+int tdb_brlock(struct tdb_context *tdb,
+              int rw_type, tdb_off_t offset, size_t len,
+              enum tdb_lock_flags flags);
+int tdb_brunlock(struct tdb_context *tdb,
+                int rw_type, tdb_off_t offset, size_t len);
 int tdb_transaction_lock(struct tdb_context *tdb, int ltype);
-int tdb_transaction_unlock(struct tdb_context *tdb);
+int tdb_transaction_unlock(struct tdb_context *tdb, int ltype);
 int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
 int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
 int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
@@ -230,7 +244,7 @@ int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
 int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
 int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
 int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
-int _tdb_transaction_cancel(struct tdb_context *tdb);
+int _tdb_transaction_cancel(struct tdb_context *tdb, int ltype);
 int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec);
 int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct tdb_record *rec);
 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec);
index 606eaa7a6fb01d2a54d5438e9b291cfcb8abeba6..9a15efce53f576209d893a146d7c1fda7267ac18 100644 (file)
@@ -414,8 +414,15 @@ static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
 /*
   brlock during a transaction - ignore them
 */
-static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
-                             int rw_type, int lck_type, int probe, size_t len)
+static int transaction_brlock(struct tdb_context *tdb,
+                             int rw_type, tdb_off_t offset, size_t len,
+                             enum tdb_lock_flags flags)
+{
+       return 0;
+}
+
+static int transaction_brunlock(struct tdb_context *tdb,
+                               int rw_type, tdb_off_t offset, size_t len)
 {
        return 0;
 }
@@ -426,7 +433,8 @@ static const struct tdb_methods transaction_methods = {
        transaction_next_hash_chain,
        transaction_oob,
        transaction_expand_file,
-       transaction_brlock
+       transaction_brlock,
+       transaction_brunlock
 };
 
 
@@ -494,7 +502,7 @@ int tdb_transaction_start(struct tdb_context *tdb)
        
        /* get a read lock from the freelist to the end of file. This
           is upgraded to a write lock during the commit */
-       if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
+       if (tdb_brlock(tdb, F_RDLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
                tdb->ecode = TDB_ERR_LOCK;
                goto fail;
@@ -530,8 +538,8 @@ int tdb_transaction_start(struct tdb_context *tdb)
        return 0;
        
 fail:
-       tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
-       tdb_transaction_unlock(tdb);
+       tdb_brunlock(tdb, F_RDLCK, FREELIST_TOP, 0);
+       tdb_transaction_unlock(tdb, F_WRLCK);
        SAFE_FREE(tdb->transaction->blocks);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
@@ -569,7 +577,8 @@ static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t
 }
 
 
-int _tdb_transaction_cancel(struct tdb_context *tdb)
+/* ltype is F_WRLCK after prepare. */
+int _tdb_transaction_cancel(struct tdb_context *tdb, int ltype)
 {      
        int i, ret = 0;
 
@@ -607,21 +616,22 @@ int _tdb_transaction_cancel(struct tdb_context *tdb)
        }
 
        if (tdb->transaction->global_lock_taken) {
-               tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+               tdb_brunlock(tdb, F_WRLCK, GLOBAL_LOCK, 1);
                tdb->transaction->global_lock_taken = false;
        }
 
        /* remove any global lock created during the transaction */
        if (tdb->global_lock.count != 0) {
-               tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
+               tdb_brunlock(tdb, tdb->global_lock.ltype,
+                            FREELIST_TOP, 4*tdb->header.hash_size);
                tdb->global_lock.count = 0;
        }
 
        /* remove any locks created during the transaction */
        if (tdb->num_locks != 0) {
                for (i=0;i<tdb->num_lockrecs;i++) {
-                       tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
-                                  F_UNLCK,F_SETLKW, 0, 1);
+                       tdb_brunlock(tdb, tdb->lockrecs[i].ltype,
+                                    FREELIST_TOP+4*tdb->lockrecs[i].list, 1);
                }
                tdb->num_locks = 0;
                tdb->num_lockrecs = 0;
@@ -631,8 +641,8 @@ int _tdb_transaction_cancel(struct tdb_context *tdb)
        /* restore the normal io methods */
        tdb->methods = tdb->transaction->io_methods;
 
-       tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
-       tdb_transaction_unlock(tdb);
+       tdb_brunlock(tdb, ltype, FREELIST_TOP, 0);
+       tdb_transaction_unlock(tdb, F_WRLCK);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
        
@@ -644,8 +654,11 @@ int _tdb_transaction_cancel(struct tdb_context *tdb)
 */
 int tdb_transaction_cancel(struct tdb_context *tdb)
 {
+       int ltype = F_RDLCK;
        tdb_trace(tdb, "tdb_transaction_cancel");
-       return _tdb_transaction_cancel(tdb);
+       if (tdb->transaction && tdb->transaction->prepared)
+               ltype = F_WRLCK;
+       return _tdb_transaction_cancel(tdb, ltype);
 }
 
 /*
@@ -911,14 +924,14 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 
        if (tdb->transaction->prepared) {
                tdb->ecode = TDB_ERR_EINVAL;
-               _tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_WRLCK);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
                return -1;
        }
 
        if (tdb->transaction->transaction_error) {
                tdb->ecode = TDB_ERR_IO;
-               _tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_RDLCK);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
                return -1;
        }
@@ -940,7 +953,7 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
        if (tdb->num_locks || tdb->global_lock.count) {
                tdb->ecode = TDB_ERR_LOCK;
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
-               _tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_RDLCK);
                return -1;
        }
 
@@ -948,16 +961,16 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
        if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
                tdb->ecode = TDB_ERR_LOCK;
-               _tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_RDLCK);
                return -1;
        }
 
        /* get the global lock - this prevents new users attaching to the database
           during the commit */
-       if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
+       if (tdb_brlock(tdb, F_WRLCK, GLOBAL_LOCK, 1, TDB_LOCK_WAIT) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
                tdb->ecode = TDB_ERR_LOCK;
-               _tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_WRLCK);
                return -1;
        }
 
@@ -967,7 +980,7 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
                /* write the recovery data to the end of the file */
                if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
-                       _tdb_transaction_cancel(tdb);
+                       _tdb_transaction_cancel(tdb, F_WRLCK);
                        return -1;
                }
        }
@@ -981,7 +994,7 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
                                             tdb->transaction->old_map_size) == -1) {
                        tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
-                       _tdb_transaction_cancel(tdb);
+                       _tdb_transaction_cancel(tdb, F_WRLCK);
                        return -1;
                }
                tdb->map_size = tdb->transaction->old_map_size;
@@ -1020,7 +1033,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
        if (tdb->transaction->transaction_error) {
                tdb->ecode = TDB_ERR_IO;
-               _tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_RDLCK);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
                return -1;
        }
@@ -1033,7 +1046,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
        /* check for a null transaction */
        if (tdb->transaction->blocks == NULL) {
-               _tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_RDLCK);
                return 0;
        }
 
@@ -1069,7 +1082,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
                        tdb->methods = methods;
                        tdb_transaction_recover(tdb); 
 
-                       _tdb_transaction_cancel(tdb);
+                       _tdb_transaction_cancel(tdb, F_WRLCK);
 
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
                        return -1;
@@ -1104,7 +1117,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
        /* use a transaction cancel to free memory and remove the
           transaction locks */
-       _tdb_transaction_cancel(tdb);
+       _tdb_transaction_cancel(tdb, F_WRLCK);
 
        if (need_repack) {
                return tdb_repack(tdb);
index c340dd354b7ad6a239aa8ecc7d4c6d2a13e11b32..d329ef4da4030c095e530b0d7b05b344c2256305 100644 (file)
@@ -229,7 +229,7 @@ int tdb_traverse_read(struct tdb_context *tdb,
        ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
        tdb->traverse_read--;
 
-       tdb_transaction_unlock(tdb);
+       tdb_transaction_unlock(tdb, F_RDLCK);
 
        return ret;
 }
@@ -260,7 +260,7 @@ int tdb_traverse(struct tdb_context *tdb,
        ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
        tdb->traverse_write--;
 
-       tdb_transaction_unlock(tdb);
+       tdb_transaction_unlock(tdb, F_WRLCK);
 
        return ret;
 }