Rewrote the tdb transaction code to be O(N) instead of O(N^2)
authorAndrew Tridgell <tridge@samba.org>
Sat, 5 Jan 2008 06:19:47 +0000 (17:19 +1100)
committerAndrew Tridgell <tridge@samba.org>
Sat, 5 Jan 2008 06:19:47 +0000 (17:19 +1100)
The previous transaction code was fast as long as you didn't do too
many writes within the transaction. The new code is a bit slower for
very small numbers of writes, but scales linearly as the number of
writes increases. The old code scaled as O(N^2) with the number of
writes, making it unusable for large N.

After testing, this needs to be merged into the Samba version of tdb,
along with many of the other recent tdb changes in the ctdb tree.

lib/tdb/common/transaction.c

index 64a5b3f3f2224e074ea2e1df79535a366b05a930..2da09face0282558cd23d29dac97240de2285b29 100644 (file)
 
 */
 
-struct tdb_transaction_el {
-       struct tdb_transaction_el *next, *prev;
-       tdb_off_t offset;
-       tdb_len_t length;
-       unsigned char *data;
-};
-
 /*
   hold the context of any current transaction
 */
 struct tdb_transaction {
        /* we keep a mirrored copy of the tdb hash heads here so
           tdb_next_hash_chain() can operate efficiently */
-       u32 *hash_heads;
+       uint32_t *hash_heads;
 
        /* the original io methods - used to do IOs to the real db */
        const struct tdb_methods *io_methods;
 
-       /* the list of transaction elements. We use a doubly linked
-          list with a last pointer to allow us to keep the list
-          ordered, with first element at the front of the list. It
-          needs to be doubly linked as the read/write traversals need
-          to be backwards, while the commit needs to be forwards */
-       struct tdb_transaction_el *elements, *elements_last;
+       /* the list of transaction blocks. When a block is first
+          written to, it gets created in this list */
+       uint8_t **blocks;
+       uint32_t num_blocks;
+       uint32_t block_size;      /* bytes in each block */
+       uint32_t last_block_size; /* number of valid bytes in the last block */
 
        /* non-zero when an internal transaction error has
           occurred. All write operations will then fail until the
@@ -134,52 +127,48 @@ struct tdb_transaction {
 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
                            tdb_len_t len, int cv)
 {
-       struct tdb_transaction_el *el;
-
-       /* we need to walk the list backwards to get the most recent data */
-       for (el=tdb->transaction->elements_last;el;el=el->prev) {
-               tdb_len_t partial;
+       uint32_t blk;
 
-               if (off+len <= el->offset) {
-                       continue;
-               }
-               if (off >= el->offset + el->length) {
-                       continue;
+       /* break it down into block sized ops */
+       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
+               tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+               if (transaction_read(tdb, off, buf, len2, cv) != 0) {
+                       return -1;
                }
+               len -= len2;
+               off += len2;
+               buf = (void *)(len2 + (char *)buf);
+       }
 
-               /* an overlapping read - needs to be split into up to
-                  2 reads and a memcpy */
-               if (off < el->offset) {
-                       partial = el->offset - off;
-                       if (transaction_read(tdb, off, buf, partial, cv) != 0) {
-                               goto fail;
-                       }
-                       len -= partial;
-                       off += partial;
-                       buf = (void *)(partial + (char *)buf);
-               }
-               if (off + len <= el->offset + el->length) {
-                       partial = len;
-               } else {
-                       partial = el->offset + el->length - off;
-               }
-               memcpy(buf, el->data + (off - el->offset), partial);
-               if (cv) {
-                       tdb_convert(buf, len);
-               }
-               len -= partial;
-               off += partial;
-               buf = (void *)(partial + (char *)buf);
-               
-               if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
+       if (len == 0) {
+               return 0;
+       }
+
+       blk = off / tdb->transaction->block_size;
+
+       /* see if we have it in the block list */
+       if (tdb->transaction->num_blocks <= blk ||
+           tdb->transaction->blocks[blk] == NULL) {
+               /* nope, do a real read */
+               if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
                        goto fail;
                }
-
                return 0;
        }
 
-       /* its not in the transaction elements - do a real read */
-       return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
+       /* it is in the block list. Now check for the last block */
+       if (blk == tdb->transaction->num_blocks-1) {
+               if (len > tdb->transaction->last_block_size) {
+                       goto fail;
+               }
+       }
+       
+       /* now copy it out of this block */
+       memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
+       if (cv) {
+               tdb_convert(buf, len);
+       }
+       return 0;
 
 fail:
        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
@@ -195,120 +184,98 @@ fail:
 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
                             const void *buf, tdb_len_t len)
 {
-       struct tdb_transaction_el *el, *best_el=NULL;
+       uint32_t blk;
 
-       if (len == 0) {
-               return 0;
-       }
-       
        /* if the write is to a hash head, then update the transaction
           hash heads */
        if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
            off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
-               u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
+               uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
                memcpy(&tdb->transaction->hash_heads[chain], buf, len);
        }
 
-       /* first see if we can replace an existing entry */
-       for (el=tdb->transaction->elements_last;el;el=el->prev) {
-               tdb_len_t partial;
-
-               if (best_el == NULL && off == el->offset+el->length) {
-                       best_el = el;
+       /* break it up into block sized chunks */
+       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
+               tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+               if (transaction_write(tdb, off, buf, len2) != 0) {
+                       return -1;
                }
-
-               if (off+len <= el->offset) {
-                       continue;
-               }
-               if (off >= el->offset + el->length) {
-                       continue;
+               len -= len2;
+               off += len2;
+               if (buf != NULL) {
+                       buf = (const void *)(len2 + (const char *)buf);
                }
+       }
 
-               /* an overlapping write - needs to be split into up to
-                  2 writes and a memcpy */
-               if (off < el->offset) {
-                       partial = el->offset - off;
-                       if (transaction_write(tdb, off, buf, partial) != 0) {
-                               goto fail;
-                       }
-                       len -= partial;
-                       off += partial;
-                       buf = (const void *)(partial + (const char *)buf);
-               }
-               if (off + len <= el->offset + el->length) {
-                       partial = len;
+       if (len == 0) {
+               return 0;
+       }
+
+       blk = off / tdb->transaction->block_size;
+       off = off % tdb->transaction->block_size;
+
+       if (tdb->transaction->num_blocks <= blk) {
+               uint8_t **new_blocks;
+               /* expand the blocks array */
+               if (tdb->transaction->blocks == NULL) {
+                       new_blocks = malloc((blk+1)*sizeof(uint8_t *));
                } else {
-                       partial = el->offset + el->length - off;
+                       new_blocks = realloc(tdb->transaction->blocks, (blk+1)*sizeof(uint8_t *));
                }
-               memcpy(el->data + (off - el->offset), buf, partial);
-               len -= partial;
-               off += partial;
-               buf = (const void *)(partial + (const char *)buf);
-               
-               if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
+               if (new_blocks == NULL) {
+                       tdb->ecode = TDB_ERR_OOM;
                        goto fail;
                }
-
-               return 0;
+               memset(&new_blocks[tdb->transaction->num_blocks], 0, 
+                      (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
+               tdb->transaction->blocks = new_blocks;
+               tdb->transaction->num_blocks = blk+1;
+               tdb->transaction->last_block_size = 0;
        }
 
-       /* see if we can append the new entry to an existing entry */
-       if (best_el && best_el->offset + best_el->length == off && 
-           (off+len < tdb->transaction->old_map_size ||
-            off > tdb->transaction->old_map_size)) {
-               unsigned char *data = best_el->data;
-               el = best_el;
-               el->data = (unsigned char *)realloc(el->data,
-                                                   el->length + len);
-               if (el->data == NULL) {
+       /* allocate and fill a block? */
+       if (tdb->transaction->blocks[blk] == NULL) {
+               tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
+               if (tdb->transaction->blocks[blk] == NULL) {
                        tdb->ecode = TDB_ERR_OOM;
                        tdb->transaction->transaction_error = 1;
-                       el->data = data;
-                       return -1;
+                       return -1;                      
                }
-               if (buf) {
-                       memcpy(el->data + el->length, buf, len);
-               } else {
-                       memset(el->data + el->length, TDB_PAD_BYTE, len);
+               if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
+                       tdb_len_t len2 = tdb->transaction->block_size;
+                       if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
+                               len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
+                       }
+                       if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
+                                                                  tdb->transaction->blocks[blk], 
+                                                                  len2, 0) != 0) {
+                               SAFE_FREE(tdb->transaction->blocks[blk]);                               
+                               tdb->ecode = TDB_ERR_IO;
+                               goto fail;
+                       }
+                       if (blk == tdb->transaction->num_blocks-1) {
+                               tdb->transaction->last_block_size = len2;
+                       }                       
                }
-               el->length += len;
-               return 0;
        }
-
-       /* add a new entry at the end of the list */
-       el = (struct tdb_transaction_el *)malloc(sizeof(*el));
-       if (el == NULL) {
-               tdb->ecode = TDB_ERR_OOM;
-               tdb->transaction->transaction_error = 1;                
-               return -1;
-       }
-       el->next = NULL;
-       el->prev = tdb->transaction->elements_last;
-       el->offset = off;
-       el->length = len;
-       el->data = (unsigned char *)malloc(len);
-       if (el->data == NULL) {
-               free(el);
-               tdb->ecode = TDB_ERR_OOM;
-               tdb->transaction->transaction_error = 1;                
-               return -1;
-       }
-       if (buf) {
-               memcpy(el->data, buf, len);
+       
+       /* overwrite part of an existing block */
+       if (buf == NULL) {
+               memset(tdb->transaction->blocks[blk] + off, 0, len);
        } else {
-               memset(el->data, TDB_PAD_BYTE, len);
+               memcpy(tdb->transaction->blocks[blk] + off, buf, len);
        }
-       if (el->prev) {
-               el->prev->next = el;
-       } else {
-               tdb->transaction->elements = el;
+       if (blk == tdb->transaction->num_blocks-1) {
+               if (len + off > tdb->transaction->last_block_size) {
+                       tdb->transaction->last_block_size = len + off;
+               }
        }
-       tdb->transaction->elements_last = el;
+
        return 0;
 
 fail:
-       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
-       tdb->ecode = TDB_ERR_IO;
+       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
+                (blk*tdb->transaction->block_size) + off, len));
        tdb->transaction->transaction_error = 1;
        return -1;
 }
@@ -316,9 +283,9 @@ fail:
 /*
   accelerated hash chain head search, using the cached hash heads
 */
-static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
+static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
 {
-       u32 h = *chain;
+       uint32_t h = *chain;
        for (;h < tdb->header.hash_size;h++) {
                /* the +1 takes account of the freelist */
                if (0 != tdb->transaction->hash_heads[h+1]) {
@@ -419,10 +386,14 @@ int tdb_transaction_start(struct tdb_context *tdb)
                return -1;
        }
 
+       /* a page at a time seems like a reasonable compromise between compactness and efficiency */
+       tdb->transaction->block_size = tdb->page_size;
+
        /* get the transaction write lock. This is a blocking lock. As
           discussed with Volker, there are a number of ways we could
           make this async, which we will probably do in the future */
        if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
+               SAFE_FREE(tdb->transaction->blocks);
                SAFE_FREE(tdb->transaction);
                return -1;
        }
@@ -437,8 +408,8 @@ int tdb_transaction_start(struct tdb_context *tdb)
 
        /* setup a copy of the hash table heads so the hash scan in
           traverse can be fast */
-       tdb->transaction->hash_heads = (u32 *)
-               calloc(tdb->header.hash_size+1, sizeof(u32));
+       tdb->transaction->hash_heads = (uint32_t *)
+               calloc(tdb->header.hash_size+1, sizeof(uint32_t));
        if (tdb->transaction->hash_heads == NULL) {
                tdb->ecode = TDB_ERR_OOM;
                goto fail;
@@ -460,21 +431,12 @@ int tdb_transaction_start(struct tdb_context *tdb)
        tdb->transaction->io_methods = tdb->methods;
        tdb->methods = &transaction_methods;
 
-       /* by calling this transaction write here, we ensure that we don't grow the
-          transaction linked list due to hash table updates */
-       if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
-                             TDB_HASHTABLE_SIZE(tdb)) != 0) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
-               tdb->ecode = TDB_ERR_IO;
-               tdb->methods = tdb->transaction->io_methods;
-               goto fail;
-       }
-
        return 0;
        
 fail:
        tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
        tdb_transaction_unlock(tdb);
+       SAFE_FREE(tdb->transaction->blocks);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
        return -1;
@@ -486,6 +448,8 @@ fail:
 */
 int tdb_transaction_cancel(struct tdb_context *tdb)
 {      
+       int i;
+
        if (tdb->transaction == NULL) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
                return -1;
@@ -499,13 +463,13 @@ int tdb_transaction_cancel(struct tdb_context *tdb)
 
        tdb->map_size = tdb->transaction->old_map_size;
 
-       /* free all the transaction elements */
-       while (tdb->transaction->elements) {
-               struct tdb_transaction_el *el = tdb->transaction->elements;
-               tdb->transaction->elements = el->next;
-               free(el->data);
-               free(el);
+       /* free all the transaction blocks */
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               if (tdb->transaction->blocks[i] != NULL) {
+                       free(tdb->transaction->blocks[i]);
+               }
        }
+       SAFE_FREE(tdb->transaction->blocks);
 
        /* remove any global lock created during the transaction */
        if (tdb->global_lock.count != 0) {
@@ -515,7 +479,6 @@ int tdb_transaction_cancel(struct tdb_context *tdb)
 
        /* remove any locks created during the transaction */
        if (tdb->num_locks != 0) {
-               int i;
                for (i=0;i<tdb->num_lockrecs;i++) {
                        tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
                                   F_UNLCK,F_SETLKW, 0, 1);
@@ -567,16 +530,24 @@ static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t
 */
 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
 {
-       struct tdb_transaction_el *el;
        tdb_len_t recovery_size = 0;
+       int i;
 
-       recovery_size = sizeof(u32);
-       for (el=tdb->transaction->elements;el;el=el->next) {
-               if (el->offset >= tdb->transaction->old_map_size) {
+       recovery_size = sizeof(uint32_t);
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
+                       break;
+               }
+               if (tdb->transaction->blocks[i] == NULL) {
                        continue;
                }
-               recovery_size += 2*sizeof(tdb_off_t) + el->length;
-       }
+               recovery_size += 2*sizeof(tdb_off_t);
+               if (i == tdb->transaction->num_blocks-1) {
+                       recovery_size += tdb->transaction->last_block_size;
+               } else {
+                       recovery_size += tdb->transaction->block_size;
+               }
+       }       
 
        return recovery_size;
 }
@@ -669,14 +640,14 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
 static int transaction_setup_recovery(struct tdb_context *tdb, 
                                      tdb_off_t *magic_offset)
 {
-       struct tdb_transaction_el *el;
        tdb_len_t recovery_size;
        unsigned char *data, *p;
        const struct tdb_methods *methods = tdb->transaction->io_methods;
        struct list_struct *rec;
        tdb_off_t recovery_offset, recovery_max_size;
        tdb_off_t old_map_size = tdb->transaction->old_map_size;
-       u32 magic, tailer;
+       uint32_t magic, tailer;
+       int i;
 
        /*
          check that the recovery area has enough space
@@ -704,30 +675,43 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        /* build the recovery data into a single blob to allow us to do a single
           large write, which should be more efficient */
        p = data + sizeof(*rec);
-       for (el=tdb->transaction->elements;el;el=el->next) {
-               if (el->offset >= old_map_size) {
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               tdb_off_t offset;
+               tdb_len_t length;
+
+               if (tdb->transaction->blocks[i] == NULL) {
                        continue;
                }
-               if (el->offset + el->length > tdb->transaction->old_map_size) {
+
+               offset = i * tdb->transaction->block_size;
+               length = tdb->transaction->block_size;
+               if (i == tdb->transaction->num_blocks-1) {
+                       length = tdb->transaction->last_block_size;
+               }
+               
+               if (offset >= old_map_size) {
+                       continue;
+               }
+               if (offset + length > tdb->transaction->old_map_size) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
                        free(data);
                        tdb->ecode = TDB_ERR_CORRUPT;
                        return -1;
                }
-               memcpy(p, &el->offset, 4);
-               memcpy(p+4, &el->length, 4);
+               memcpy(p, &offset, 4);
+               memcpy(p+4, &length, 4);
                if (DOCONV()) {
                        tdb_convert(p, 8);
                }
                /* the recovery area contains the old data, not the
                   new data, so we have to call the original tdb_read
                   method to get it */
-               if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
+               if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
                        free(data);
                        tdb->ecode = TDB_ERR_IO;
                        return -1;
                }
-               p += 8 + el->length;
+               p += 8 + length;
        }
 
        /* and the tailer */
@@ -779,7 +763,8 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 {      
        const struct tdb_methods *methods;
        tdb_off_t magic_offset = 0;
-       u32 zero = 0;
+       uint32_t zero = 0;
+       int i;
 
        if (tdb->transaction == NULL) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
@@ -799,7 +784,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
        }               
 
        /* check for a null transaction */
-       if (tdb->transaction->elements == NULL) {
+       if (tdb->transaction->blocks == NULL) {
                tdb_transaction_cancel(tdb);
                return 0;
        }
@@ -858,10 +843,21 @@ int tdb_transaction_commit(struct tdb_context *tdb)
        }
 
        /* perform all the writes */
-       while (tdb->transaction->elements) {
-               struct tdb_transaction_el *el = tdb->transaction->elements;
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               tdb_off_t offset;
+               tdb_len_t length;
 
-               if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
+               if (tdb->transaction->blocks[i] == NULL) {
+                       continue;
+               }
+
+               offset = i * tdb->transaction->block_size;
+               length = tdb->transaction->block_size;
+               if (i == tdb->transaction->num_blocks-1) {
+                       length = tdb->transaction->last_block_size;
+               }
+
+               if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
                        
                        /* we've overwritten part of the data and
@@ -876,11 +872,12 @@ int tdb_transaction_commit(struct tdb_context *tdb)
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
                        return -1;
                }
-               tdb->transaction->elements = el->next;
-               free(el->data); 
-               free(el);
+               SAFE_FREE(tdb->transaction->blocks[i]);
        } 
 
+       SAFE_FREE(tdb->transaction->blocks);
+       tdb->transaction->num_blocks = 0;
+
        if (!(tdb->flags & TDB_NOSYNC)) {
                /* ensure the new data is on disk */
                if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
@@ -932,7 +929,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
 {
        tdb_off_t recovery_head, recovery_eof;
        unsigned char *data, *p;
-       u32 zero = 0;
+       uint32_t zero = 0;
        struct list_struct rec;
 
        /* find the recovery area */
@@ -986,7 +983,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        /* recover the file data */
        p = data;
        while (p+8 < data + rec.data_len) {
-               u32 ofs, len;
+               uint32_t ofs, len;
                if (DOCONV()) {
                        tdb_convert(p, 8);
                }