r7562: work in progress
authorDerrell Lipman <derrell@samba.org>
Tue, 14 Jun 2005 03:08:34 +0000 (03:08 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 18:18:09 +0000 (13:18 -0500)
source/lib/ldb/ldb_sqlite3/ldb_sqlite3.c

index 4089f0bdb5884ef16f97d77476f08d5e84f249ae..dbbb77592894281e452e79fb22b8cac0a59467ef 100644 (file)
 # define TRUE   (! FALSE)
 #endif
 
+#define FILTER_ATTR_TABLE       "temp_filter_attrs"
+#define RESULT_ATTR_TABLE       "temp_result_attrs"
+
 #define QUERY_NOROWS(lsqlite3, bRollbackOnError, sql...)        \
-    do {                                                        \
-            if (query_norows(lsqlite3, sql) != 0) {             \
-                if (bRollbackOnError) {                         \
-                        query_norows(lsqlite3,                  \
-                                       "ROLLBACK;");            \
+        do {                                                    \
+                if (query_norows(lsqlite3, sql) != 0) {         \
+                        if (bRollbackOnError) {                 \
+                                query_norows(lsqlite3,          \
+                                             "ROLLBACK;");      \
+                        }                                       \
+                        return -1;                              \
                 }                                               \
-                return -1;                                      \
-        }                                                       \
-    } while (0)
+        } while (0)
 
 #define QUERY_INT(lsqlite3, result_var, bRollbackOnError, sql...)       \
-    do {                                                                \
-            if (query_int(lsqlite3, &result_var, sql) != 0) {           \
-                    if (bRollbackOnError) {                             \
-                            query_norows(lsqlite3,                      \
-                                                  "ROLLBACK;");         \
-                    }                                                   \
-                    return -1;                                          \
-            }                                                           \
-    } while (0)
+        do {                                                            \
+                if (query_int(lsqlite3, &result_var, sql) != 0) {       \
+                        if (bRollbackOnError) {                         \
+                                query_norows(lsqlite3,                  \
+                                             "ROLLBACK;");              \
+                        }                                               \
+                        return -1;                                      \
+                }                                                       \
+        } while (0)
 
 
 /*
@@ -83,13 +86,21 @@ static int
 lsqlite3_delete(struct ldb_module *module,
                 const char *dn);
 
+static int
+lsqlite3_search_bytree(struct ldb_module * module,
+                       const char * pBaseDN,
+                       enum ldb_scope scope,
+                       struct ldb_parse_tree * pTree,
+                       const char * const * attrs,
+                       struct ldb_message *** pppRes);
+
 static int
 lsqlite3_search(struct ldb_module * module,
                 const char * pBaseDN,
                 enum ldb_scope scope,
                 const char * pExpression,
                 const char * const attrs[],
-                struct ldb_message *** res);
+                struct ldb_message *** pppRes);
 
 static int
 lsqlite3_add(struct ldb_module *module,
@@ -131,15 +142,28 @@ query_int(const struct lsqlite3_private * lsqlite3,
 static int case_fold_attr_required(void * hUserData,
                                    char *attr);
 
+static int
+add_msg_attr(void * hTalloc,
+             long long eid,
+             const char * pDN,
+             const char * pAttrName,
+             const char * pAttrValue,
+             long long prevEID,
+             int * pAllocated,
+             struct ldb_message *** pppRes);
+
 static char *
 parsetree_to_sql(struct ldb_module *module,
-                          char * hTalloc,
+                 char * hTalloc,
                  const struct ldb_parse_tree *t);
 
+static int
+parsetree_to_attrlist(struct lsqlite3_private * lsqlite3,
+                      const struct ldb_parse_tree * t);
+
 static char *
-parsetree_to_tablelist(struct ldb_module *module,
-                       char * hTalloc,
-                       const struct ldb_parse_tree *t);
+build_attr_table_list(void * hTalloc,
+                      struct lsqlite3_private * lsqlite3);
 
 static int
 msg_to_sql(struct ldb_module * module,
@@ -193,54 +217,51 @@ lsqlite3_connect(const char *url,
         int                         ret;
        struct ldb_context *        ldb = NULL;
        struct lsqlite3_private *   lsqlite3 = NULL;
-
+        
        ldb = talloc(NULL, struct ldb_context);
        if (!ldb) {
-               errno = ENOMEM;
                goto failed;
        }
-
+        
        lsqlite3 = talloc(ldb, struct lsqlite3_private);
        if (!lsqlite3) {
-               errno = ENOMEM;
                goto failed;
        }
-
+        
        lsqlite3->sqlite = NULL;
        lsqlite3->options = NULL;
         lsqlite3->lock_count = 0;
-
+        
        ret = initialize(lsqlite3, url);
        if (ret != SQLITE_OK) {
                goto failed;
        }
-
+        
        talloc_set_destructor(lsqlite3, destructor);
-
+        
        ldb->modules = talloc(ldb, struct ldb_module);
        if (!ldb->modules) {
-               errno = ENOMEM;
                goto failed;
        }
        ldb->modules->ldb = ldb;
        ldb->modules->prev = ldb->modules->next = NULL;
        ldb->modules->private_data = lsqlite3;
        ldb->modules->ops = &lsqlite3_ops;
-
+        
        if (options) {
                /*
                  * take a copy of the options array, so we don't have to rely
                  * on the caller keeping it around (it might be dynamic)
                  */
                for (i=0;options[i];i++) ;
-
+                
                lsqlite3->options = talloc_array(lsqlite3, char *, i+1);
                if (!lsqlite3->options) {
                        goto failed;
                }
-               
+                
                for (i=0;options[i];i++) {
-
+                        
                        lsqlite3->options[i+1] = NULL;
                        lsqlite3->options[i] =
                                 talloc_strdup(lsqlite3->options, options[i]);
@@ -249,9 +270,9 @@ lsqlite3_connect(const char *url,
                        }
                }
        }
-
+        
        return ldb;
-
+        
 failed:
         if (lsqlite3->sqlite != NULL) {
                 (void) sqlite3_close(lsqlite3->sqlite);
@@ -275,7 +296,7 @@ lsqlite3_rename(struct ldb_module * module,
        if (olddn[0] == '@' ||newdn[0] == '@') {
                return 0;
        }
-
+        
 #warning "lsqlite3_rename() is not yet supported"
         return -1;
 }
@@ -289,42 +310,44 @@ lsqlite3_delete(struct ldb_module *module,
        if (dn[0] == '@') {
                return 0;
        }
-       
+        
 #warning "lsqlite3_delete() is not yet supported"
         return -1;
 }
 
-/* search for matching records */
+/* search for matching records, by tree */
 static int
-lsqlite3_search(struct ldb_module * module,
-                const char * pBaseDN,
-                enum ldb_scope scope,
-                const char * pExpression,
-                const char * const attrs[],
-                struct ldb_message *** res)
+lsqlite3_search_bytree(struct ldb_module * module,
+                       const char * pBaseDN,
+                       enum ldb_scope scope,
+                       struct ldb_parse_tree * pTree,
+                       const char * const * attrs,
+                       struct ldb_message *** pppRes)
 {
+        int                         i;
         int                         ret;
+        int                         allocated;
         int                         bLoop;
         long long                   eid = 0;
         long long                   prevEID;
         char *                      pSql = NULL;
        char *                      pSqlConstraints;
         char *                      pTableList;
-        char *                      hTalloc;
+        char *                      hTalloc = NULL;
         const char *                pDN;
         const char *                pAttrName;
         const char *                pAttrValue;
+        const char * const *        pRequestedAttrs;
         sqlite3_stmt *              pStmt;
-       struct ldb_parse_tree *     pTree;
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-       
+        
        if (pBaseDN == NULL) {
                pBaseDN = "";
        }
-
+        
         /* Begin a transaction */
         QUERY_NOROWS(lsqlite3, FALSE, "BEGIN IMMEDIATE;");
-
+        
         /*
          * Obtain the eid of the base DN
          */
@@ -335,24 +358,60 @@ lsqlite3_search(struct ldb_module * module,
                   "  FROM ldb_attr_dn "
                   "  WHERE attr_value = %Q;",
                   pBaseDN);
-
-        /* Parse the filter expression into a tree we can work with */
-       if ((pTree = ldb_parse_tree(module->ldb, pExpression)) == NULL) {
-               return -1;
-       }
-       
+        
         /* Allocate a temporary talloc context */
-       hTalloc = talloc_new(module->ldb);
-
-        /* Move the parse tree to our temporary context */
-       talloc_steal(hTalloc, pTree);
-       
-        /* Convert filter into a series of SQL statements (constraints) */
+       if ((hTalloc = talloc_new(module->ldb)) == NULL) {
+                ret = -1;
+                talloc_free(pTree);
+                goto cleanup;
+        }
+        
+        /* Convert filter into a series of SQL conditions (constraints) */
        pSqlConstraints = parsetree_to_sql(module, hTalloc, pTree);
-       
-        /* Get the list of attribute names to use as our extra table list */
-        pTableList = parsetree_to_tablelist(module, hTalloc, pTree);
-
+        
+        /* Ensure we're starting with an empty result attribute table */
+        QUERY_NOROWS(lsqlite3,
+                     FALSE,
+                     "DELETE FROM " RESULT_ATTR_TABLE " "
+                     "  WHERE 1;");/* avoid a schema change with WHERE 1 */
+        
+        /* Insert the list of requested attributes into this table */
+        for (pRequestedAttrs = (const char * const *) attrs;
+             pRequestedAttrs != NULL;
+             pRequestedAttrs++) {
+                
+                QUERY_NOROWS(lsqlite3,
+                             FALSE,
+                             "INSERT OR IGNORE INTO " RESULT_ATTR_TABLE " "
+                             "    (attr_name) "
+                             "  VALUES "
+                             "    (%Q);",
+                             *pRequestedAttrs);
+        }
+        
+        /* Ensure we're starting with an empty filter attribute table */
+        QUERY_NOROWS(lsqlite3,
+                     FALSE,
+                     "DELETE FROM " FILTER_ATTR_TABLE " "
+                     "  WHERE 1;");/* avoid a schema change with WHERE 1 */
+        
+        /*
+         * Create a table of unique attribute names for our extra table list
+         */
+        if (parsetree_to_attrlist(lsqlite3, pTree) != 0) {
+                ret = -1;
+                goto cleanup;
+        }
+        
+        /*
+         * Build the attribute table list from the list of unique names.
+         */
+        
+        if ((pTableList = build_attr_table_list(hTalloc, lsqlite3)) == NULL) {
+                ret = -1;
+                goto cleanup;
+        }
+        
         switch(scope) {
         case LDB_SCOPE_DEFAULT:
         case LDB_SCOPE_SUBTREE:
@@ -373,12 +432,15 @@ lsqlite3_search(struct ldb_module * module,
                         "         AND ldap_entry.eid IN\n%s\n"
                         "    ) "
                         "    AND av.eid = entry.eid "
+                        "    AND av.attr_name IN "
+                        "          (SELECT attr_name "
+                        "             FROM " RESULT_ATTR_TABLE ") "
                         "  ORDER BY av.eid, av.attr_name;",
                         pTableList,
                         eid,
                         pSqlConstraints);
                 break;
-
+                
         case LDB_SCOPE_BASE:
                 pSql = sqlite3_mprintf(
                         "SELECT entry.eid,\n"
@@ -394,12 +456,15 @@ lsqlite3_search(struct ldb_module * module,
                         "         AND ldb_entry.eid IN\n%s\n"
                         "    ) "
                         "    AND av.eid = entry.eid "
+                        "    AND av.attr_name IN "
+                        "          (SELECT attr_name "
+                        "             FROM " RESULT_ATTR_TABLE ") "
                         "  ORDER BY av.eid, av.attr_name;",
                         pTableList,
                         eid,
                         pSqlConstraints);
                 break;
-
+                
         case LDB_SCOPE_ONELEVEL:
                 pSql = sqlite3_mprintf(
                         "SELECT entry.eid,\n"
@@ -417,70 +482,96 @@ lsqlite3_search(struct ldb_module * module,
                         "         AND ldb_entry.eid IN\n%s\n"
                         "    ) "
                         "    AND av.eid = entry.eid "
+                        "    AND av.attr_name IN "
+                        "          (SELECT attr_name "
+                        "             FROM " RESULT_ATTR_TABLE ") "
                         "  ORDER BY av.eid, av.attr_name;",
                         pTableList,
                         eid,
                         pSqlConstraints);
                 break;
         }
-
-        /* Initially, we have no old eid */
-        prevEID = -1;
-
+        
         /*
          * Prepare and execute the SQL statement.  Loop allows retrying on
          * certain errors, e.g. SQLITE_SCHEMA occurs if the schema changes,
          * requiring retrying the operation.
          */
         for (bLoop = TRUE; bLoop; ) {
-
+                /* There are no allocate message structures yet */
+                allocated = 0;
+                
+                for (i = 0; i < allocated; i++) {
+                        (*pppRes)[i] = NULL;
+                }
+                
                 /* Compile the SQL statement into sqlite virtual machine */
                 if ((ret = sqlite3_prepare(lsqlite3->sqlite,
                                            pSql,
                                            -1,
                                            &pStmt,
                                            NULL)) == SQLITE_SCHEMA) {
+                        talloc_free(*pppRes);
                         continue;
                 } else if (ret != SQLITE_OK) {
                         ret = -1;
                         break;
                 }
                 
+                /* Initially, we have no previous eid */
+                prevEID = -1;
+                
                 /* Loop through the returned rows */
-                do {
+                for (ret = SQLITE_ROW; ret == SQLITE_ROW; ) {
+                        
                         /* Get the next row */
                         if ((ret = sqlite3_step(pStmt)) == SQLITE_ROW) {
-
+                                
                                 /* Get the values from this row */
                                 eid = sqlite3_column_int64(pStmt, 0);
                                 pDN = sqlite3_column_text(pStmt, 1);
                                 pAttrName = sqlite3_column_text(pStmt, 2);
                                 pAttrValue = sqlite3_column_text(pStmt, 3);
                                 
-                                /* Add this row data to the return results */
-#warning "finish returning the result set of the search here"
+                                /* Add this result to the result set */
+                                if ((ret = add_msg_attr(hTalloc,
+                                                        eid,
+                                                        pDN,
+                                                        pAttrName,
+                                                        pAttrValue,
+                                                        prevEID,
+                                                        &allocated,
+                                                        pppRes)) != 0) {
+                                        
+                                        (void) sqlite3_finalize(pStmt);
+                                        ret = -1;
+                                        break;
+                                }
                         }
-                } while (ret == SQLITE_ROW);
-
+#warning "finish returning the result set of the search here"
+                }
+                
                 if (ret == SQLITE_SCHEMA) {
                         (void) sqlite3_finalize(pStmt);
+                        talloc_free(*pppRes);
                         continue;
                 } else if (ret != SQLITE_DONE) {
                         (void) sqlite3_finalize(pStmt);
                         ret = -1;
                         break;
                 }
-
+                
                 /* Free the virtual machine */
                 if ((ret = sqlite3_finalize(pStmt)) == SQLITE_SCHEMA) {
                         (void) sqlite3_finalize(pStmt);
+                        talloc_free(*pppRes);
                         continue;
                 } else if (ret != SQLITE_OK) {
                         (void) sqlite3_finalize(pStmt);
                         ret = -1;
                         break;
                 }
-
+                
                 /*
                  * Normal condition is only one time through loop.  Loop is
                  * rerun in error conditions, via "continue", above.
@@ -488,15 +579,66 @@ lsqlite3_search(struct ldb_module * module,
                 ret = 0;
                 bLoop = FALSE;
         }
-
-
+        
         /* End the transaction */
         QUERY_NOROWS(lsqlite3, FALSE, "END TRANSACTION;");
-
+        
         /* We're alll done with this query */
         sqlite3_free(pSql);
+        
+        /* Were there any results? */
+        if (ret != 0 || allocated == 0) {
+                /* Nope.  We can free the results. */
+                talloc_free(pppRes);
+        }
+        
+cleanup:
+        /* Clean up our temporary tables */
+        QUERY_NOROWS(lsqlite3,
+                     FALSE,
+                     "DELETE FROM " RESULT_ATTR_TABLE " "
+                     "  WHERE 1;");/* avoid a schema change with WHERE 1 */
+        
+        QUERY_NOROWS(lsqlite3,
+                     FALSE,
+                     "DELETE FROM " FILTER_ATTR_TABLE " "
+                     "  WHERE 1;");/* avoid a schema change with WHERE 1 */
+        
+        
+        if (hTalloc != NULL) {
+                talloc_free(hTalloc);
+        }
+        
+        /* If error, return error code; otherwise return number of results */
+       return ret == 0 ? allocated : ret;
+}
 
-       return ret;
+/* search for matching records, by expression */
+static int
+lsqlite3_search(struct ldb_module * module,
+                const char * pBaseDN,
+                enum ldb_scope scope,
+                const char * pExpression,
+                const char * const * attrs,
+                struct ldb_message *** pppRes)
+{
+        int                     ret;
+        struct ldb_parse_tree * pTree;
+        
+        /* Parse the filter expression into a tree we can work with */
+       if ((pTree = ldb_parse_tree(module->ldb, pExpression)) == NULL) {
+                return -1;
+       }
+        
+        /* Now use the bytree function for the remainder of processing */
+        ret = lsqlite3_search_bytree(module, pBaseDN, scope,
+                                     pTree, attrs, pppRes);
+        
+        /* Free the parse tree */
+       talloc_free(pTree);
+        
+        /* All done. */
+        return ret;
 }
 
 
@@ -507,15 +649,15 @@ lsqlite3_add(struct ldb_module *module,
 {
         long long                   eid;
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-
+        
        /* ignore ltdb specials */
        if (msg->dn[0] == '@') {
                return 0;
        }
-
+        
         /* Begin a transaction */
         QUERY_NOROWS(lsqlite3, FALSE, "BEGIN EXCLUSIVE;");
-
+        
         /*
          * Build any portions of the directory tree that don't exist.  If the
          * final component already exists, it's an error.
@@ -524,13 +666,13 @@ lsqlite3_add(struct ldb_module *module,
                 QUERY_NOROWS(lsqlite3, FALSE, "ROLLBACK;");
                 return -1;
         }
-
+        
         /* Add attributes to this new entry */
        if (msg_to_sql(module, msg, eid, FALSE) != 0) {
                 QUERY_NOROWS(lsqlite3, FALSE, "ROLLBACK;");
                 return -1;
         }
-
+        
         /* Everything worked.  Commit it! */
         QUERY_NOROWS(lsqlite3, TRUE, "COMMIT;");
         return 0;
@@ -543,15 +685,15 @@ lsqlite3_modify(struct ldb_module *module,
                 const struct ldb_message *msg)
 {
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-
+        
        /* ignore ltdb specials */
        if (msg->dn[0] == '@') {
                return 0;
        }
-
+        
         /* Begin a transaction */
         QUERY_NOROWS(lsqlite3, FALSE, "BEGIN EXCLUSIVE;");
-
+        
         /* Everything worked.  Commit it! */
         QUERY_NOROWS(lsqlite3, TRUE, "COMMIT;");
         return 0 ;
@@ -565,9 +707,9 @@ lsqlite3_lock(struct ldb_module *module,
        if (lockname == NULL) {
                return -1;
        }
-
+        
        /* TODO implement a local locking mechanism here */
-
+        
        return 0;
 }
 
@@ -579,9 +721,9 @@ lsqlite3_unlock(struct ldb_module *module,
        if (lockname == NULL) {
                return -1;
        }
-
+        
        /* TODO implement a local locking mechanism here */
-
+        
         return 0;
 }
 
@@ -590,7 +732,7 @@ static const char *
 lsqlite3_errstring(struct ldb_module *module)
 {
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-
+        
        return sqlite3_errmsg(lsqlite3->sqlite);
 }
 
@@ -611,22 +753,22 @@ initialize(struct lsqlite3_private *lsqlite3,
         sqlite3_stmt *  stmt;
         const char *    schema =       
                 "-- ------------------------------------------------------"
-
+                
                 "PRAGMA auto_vacuum=1;"
-
+                
                 "-- ------------------------------------------------------"
-
+                
                 "BEGIN EXCLUSIVE;"
-
+                
                 "-- ------------------------------------------------------"
-
+                
                 "CREATE TABLE ldb_info AS"
                 "  SELECT 'LDB' AS database_type,"
                 "         '1.0' AS version;"
-
+                
                 "-- ------------------------------------------------------"
                 "-- Schema"
-
+                
                 "/*"
                 " * The entry table holds the information about an entry. "
                 " * This table is used to obtain the EID of the entry and to "
@@ -638,21 +780,21 @@ initialize(struct lsqlite3_private *lsqlite3,
                 "("
                 "  -- Unique identifier of this LDB entry"
                 "  eid                   INTEGER PRIMARY KEY,"
-
+                
                 "  -- Unique identifier of the parent LDB entry"
                 "  peid                  INTEGER REFERENCES ldb_entry,"
-
+                
                 "  -- Distinguished name of this entry"
                 "  dn                    TEXT,"
-
+                
                 "  -- Time when the entry was created"
                 "  create_timestamp      INTEGER,"
-
+                
                 "  -- Time when the entry was last modified"
                 "  modify_timestamp      INTEGER"
                 ");"
-
-
+                
+                
                 "/*"
                 " * The purpose of the descendant table is to support the"
                 " * subtree search feature.  For each LDB entry with a unique"
@@ -671,24 +813,24 @@ initialize(struct lsqlite3_private *lsqlite3,
                 "("
                 "  -- The unique identifier of the ancestor LDB entry"
                 "  aeid                  INTEGER REFERENCES ldb_entry,"
-
+                
                 "  -- The unique identifier of the descendant LDB entry"
                 "  deid                  INTEGER REFERENCES ldb_entry"
                 ");"
-
-
+                
+                
                 "CREATE TABLE ldb_object_classes"
                 "("
                 "  -- Object classes are inserted into this table to track"
                 "  -- their class hierarchy.  'top' is the top-level class"
                 "  -- of which all other classes are subclasses."
                 "  class_name            TEXT PRIMARY KEY,"
-
+                
                 "  -- tree_key tracks the position of the class in"
                 "  -- the hierarchy"
                 "  tree_key              TEXT UNIQUE"
                 ");"
-
+                
                 "/*"
                 " * We keep a full listing of attribute/value pairs here"
                 " */"
@@ -698,7 +840,7 @@ initialize(struct lsqlite3_private *lsqlite3,
                 "  attr_name             TEXT, -- see ldb_attr_ATTRIBUTE_NAME"
                 "  attr_value            TEXT"
                 ");"
-
+                
                 "/*"
                 " * There is one attribute table per searchable attribute."
                 " */"
@@ -707,20 +849,20 @@ initialize(struct lsqlite3_private *lsqlite3,
                 "("
                 "  -- The unique identifier of the LDB entry"
                 "  eid                   INTEGER REFERENCES ldb_entry,"
-
+                
                 "  -- Normalized attribute value"
                 "  attr_value            TEXT"
                 ");"
                 "*/"
-
-
+                
+                
                 "-- ------------------------------------------------------"
                 "-- Indexes"
-
-
+                
+                
                 "-- ------------------------------------------------------"
                 "-- Triggers"
-
+                
                 "CREATE TRIGGER ldb_entry_insert_tr"
                 "  AFTER INSERT"
                 "  ON ldb_entry"
@@ -731,7 +873,7 @@ initialize(struct lsqlite3_private *lsqlite3,
                 "            modify_timestamp = strftime('%s', 'now')"
                 "        WHERE eid = new.eid;"
                 "    END;"
-
+                
                 "CREATE TRIGGER ldb_entry_update_tr"
                 "  AFTER UPDATE"
                 "  ON ldb_entry"
@@ -741,19 +883,19 @@ initialize(struct lsqlite3_private *lsqlite3,
                 "        SET modify_timestamp = strftime('%s', 'now')"
                 "        WHERE eid = old.eid;"
                 "    END;"
-
+                
                 "-- ------------------------------------------------------"
                 "-- Table initialization"
-
+                
                 "/* We need an implicit 'top' level object class */"
                 "INSERT INTO ldb_attributes (attr_name,"
                 "                            parent_tree_key)"
                 "  SELECT 'top', '';"
-
+                
                 "-- ------------------------------------------------------"
-
+                
                 "COMMIT;"
-
+                
                 "-- ------------------------------------------------------"
                 ;
         
@@ -761,18 +903,18 @@ initialize(struct lsqlite3_private *lsqlite3,
         if (strncmp(url, "sqlite://", 9) != 0) {
                 return SQLITE_MISUSE;
         }
-
+        
         /* Update pointer to just after the protocol indicator */
         url += 9;
-                
+        
         /* Try to open the (possibly empty/non-existent) database */
         if ((ret = sqlite3_open(url, &lsqlite3->sqlite)) != SQLITE_OK) {
                 return ret;
         }
-
+        
         /* Begin a transaction */
         QUERY_NOROWS(lsqlite3, FALSE, "BEGIN EXCLUSIVE;");
-
+        
         /* Determine if this is a new database.  No tables means it is. */
         QUERY_INT(lsqlite3,
                   queryInt,
@@ -780,13 +922,13 @@ initialize(struct lsqlite3_private *lsqlite3,
                   "SELECT COUNT(*) "
                   "  FROM sqlite_master "
                   "  WHERE type = 'table';");
-
+        
         if (queryInt == 0) {
                 /*
                  * Create the database schema
                  */
                 for (pTail = discard_const_p(char, schema); pTail != NULL; ) {
-
+                        
                         if ((ret = sqlite3_prepare(
                                      lsqlite3->sqlite,
                                      pTail,
@@ -795,7 +937,7 @@ initialize(struct lsqlite3_private *lsqlite3,
                                      &pTail)) != SQLITE_OK ||
                             (ret = sqlite3_step(stmt)) != SQLITE_DONE ||
                             (ret = sqlite3_finalize(stmt)) != SQLITE_OK) {
-
+                                
                                 QUERY_NOROWS(lsqlite3, FALSE, "ROLLBACK;");
                                 (void) sqlite3_close(lsqlite3->sqlite);
                                 return ret;
@@ -825,17 +967,39 @@ initialize(struct lsqlite3_private *lsqlite3,
                               "       AND version = '1.0'"
                               "  );") != 0 ||
                     queryInt != 1) {
-                
+                        
                         /* It's not one that we created.  See ya! */
                         QUERY_NOROWS(lsqlite3, FALSE, "ROLLBACK;");
                         (void) sqlite3_close(lsqlite3->sqlite);
                         return SQLITE_MISUSE;
                 }
         }
-
+        
+        /*
+         * Create a temporary table to hold attributes requested in the result
+         * set of a search.
+         */
+        QUERY_NOROWS(lsqlite3,
+                     FALSE,
+                     "CREATE TEMPORARY TABLE " RESULT_ATTR_TABLE " "
+                     " ("
+                     "  attr_name TEXT PRIMARY KEY "
+                     " );");
+        
+        /*
+         * Create a temporary table to hold the attributes used by filters
+         * during a search.
+         */
+        QUERY_NOROWS(lsqlite3,
+                     FALSE,
+                     "CREATE TEMPORARY TABLE " FILTER_ATTR_TABLE " "
+                     " ("
+                     "  attr_name TEXT PRIMARY KEY "
+                     " );");
+        
         /* Commit the transaction */
         QUERY_NOROWS(lsqlite3, FALSE, "COMMIT;");
-
+        
         return SQLITE_OK;
 }
 
@@ -843,7 +1007,7 @@ static int
 destructor(void *p)
 {
        struct lsqlite3_private *   lsqlite3 = p;
-
+        
         (void) sqlite3_close(lsqlite3->sqlite);
        return 0;
 }
@@ -871,19 +1035,19 @@ query_norows(const struct lsqlite3_private *lsqlite3,
         
         /* Begin access to variable argument list */
         va_start(args, pSql);
-
+        
         /* Format the query */
         if ((p = sqlite3_vmprintf(pSql, args)) == NULL) {
                 return -1;
         }
-
+        
         /*
          * Prepare and execute the SQL statement.  Loop allows retrying on
          * certain errors, e.g. SQLITE_SCHEMA occurs if the schema changes,
          * requiring retrying the operation.
          */
         for (bLoop = TRUE; bLoop; ) {
-
+                
                 /* Compile the SQL statement into sqlite virtual machine */
                 if ((ret = sqlite3_prepare(lsqlite3->sqlite,
                                            pTail,
@@ -905,7 +1069,7 @@ query_norows(const struct lsqlite3_private *lsqlite3,
                         ret = -1;
                         break;
                 }
-
+                
                 /* Free the virtual machine */
                 if ((ret = sqlite3_finalize(pStmt)) == SQLITE_SCHEMA) {
                         (void) sqlite3_finalize(pStmt);
@@ -915,7 +1079,7 @@ query_norows(const struct lsqlite3_private *lsqlite3,
                         ret = -1;
                         break;
                 }
-
+                
                 /*
                  * Normal condition is only one time through loop.  Loop is
                  * rerun in error conditions, via "continue", above.
@@ -923,13 +1087,13 @@ query_norows(const struct lsqlite3_private *lsqlite3,
                 ret = 0;
                 bLoop = FALSE;
         }
-
+        
         /* All done with variable argument list */
         va_end(args);
-
+        
         /* Free the memory we allocated for our query string */
         sqlite3_free(p);
-
+        
         return ret;
 }
 
@@ -958,19 +1122,19 @@ query_int(const struct lsqlite3_private * lsqlite3,
         
         /* Begin access to variable argument list */
         va_start(args, pSql);
-
+        
         /* Format the query */
         if ((p = sqlite3_vmprintf(pSql, args)) == NULL) {
                 return -1;
         }
-
+        
         /*
          * Prepare and execute the SQL statement.  Loop allows retrying on
          * certain errors, e.g. SQLITE_SCHEMA occurs if the schema changes,
          * requiring retrying the operation.
          */
         for (bLoop = TRUE; bLoop; ) {
-
+                
                 /* Compile the SQL statement into sqlite virtual machine */
                 if ((ret = sqlite3_prepare(lsqlite3->sqlite,
                                            pTail,
@@ -992,10 +1156,10 @@ query_int(const struct lsqlite3_private * lsqlite3,
                         ret = -1;
                         break;
                 }
-
+                
                 /* Get the value to be returned */
                 *pRet = sqlite3_column_int64(pStmt, 0);
-
+                
                 /* Free the virtual machine */
                 if ((ret = sqlite3_finalize(pStmt)) == SQLITE_SCHEMA) {
                         (void) sqlite3_finalize(pStmt);
@@ -1005,7 +1169,7 @@ query_int(const struct lsqlite3_private * lsqlite3,
                         ret = -1;
                         break;
                 }
-
+                
                 /*
                  * Normal condition is only one time through loop.  Loop is
                  * rerun in error conditions, via "continue", above.
@@ -1013,13 +1177,13 @@ query_int(const struct lsqlite3_private * lsqlite3,
                 ret = 0;
                 bLoop = FALSE;
         }
-
+        
         /* All done with variable argument list */
         va_end(args);
-
+        
         /* Free the memory we allocated for our query string */
         sqlite3_free(p);
-
+        
         return ret;
 }
 
@@ -1043,162 +1207,210 @@ case_fold_attr_required(void * hUserData,
  * add a single set of ldap message values to a ldb_message
  */
 
-#warning "add_msg_attr() not yet implemented or used"
-#if 0
 static int
-add_msg_attr(struct ldb_context *ldb,
-                      struct ldb_message *msg, 
-                      const char *attr,
-                      struct berval **bval)
+add_msg_attr(void * hTalloc,
+             long long eid,
+             const char * pDN,
+             const char * pAttrName,
+             const char * pAttrValue,
+             long long prevEID,
+             int * pAllocated,
+             struct ldb_message *** pppRes)
 {
-        int                          i;
-       int                          count;
+        void *                       x;
+        struct ldb_message *         msg;
        struct ldb_message_element * el;
-
-       count = ldap_count_values_len(bval);
-
-       if (count <= 0) {
-               return -1;
-       }
-
-       el = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
-                             msg->num_elements + 1);
-       if (!el) {
-               errno = ENOMEM;
-               return -1;
-       }
-
-       msg->elements = el;
-
-       el = &msg->elements[msg->num_elements];
-
-       el->name = talloc_strdup(msg->elements, attr);
-       if (!el->name) {
-               errno = ENOMEM;
-               return -1;
-       }
-       el->flags = 0;
-
-       el->num_values = 0;
-       el->values = talloc_array(msg->elements, struct ldb_val, count);
-       if (!el->values) {
-               errno = ENOMEM;
-               return -1;
-       }
-
-       for (i=0;i<count;i++) {
-               el->values[i].data = talloc_memdup(el->values, bval[i]->bv_val, bval[i]->bv_len);
-               if (!el->values[i].data) {
-                       return -1;
-               }
-               el->values[i].length = bval[i]->bv_len;
-               el->num_values++;
-       }
-
-       msg->num_elements++;
-
+        
+        /* Is this a different EID than the previous one? */
+        if (eid != prevEID) {
+                /* Yup.  Add another result to the result array */
+                if ((x = talloc_realloc(hTalloc,
+                                        *pAllocated == 0 ? NULL : pppRes,
+                                        struct ldb_message *,
+                                        *pAllocated + 1)) == NULL) {
+                        
+                        return -1;
+                }
+                
+                /* Save the new result list */
+                *pppRes = x;
+                
+                /* We've allocated one more result */
+                *pAllocated++;
+                
+                /* Ensure that the message is initialized */
+                msg = x;
+                msg->dn = NULL;
+                msg->num_elements = 0;
+                msg->elements = NULL;
+                msg->private_data = NULL;
+        } else {
+                /* Same EID.  Point to the previous most-recent message */
+                msg = *pppRes[*pAllocated - 1];
+        }
+        
+        /*
+         * Point to the most recent previous element.  (If there are none,
+         * this will point to non-allocated memory, but the pointer will never
+         * be dereferenced.)
+         */
+        el = &msg->elements[msg->num_elements - 1];
+        
+        /* See if the most recent previous element has the same attr_name */
+        if (msg->num_elements == 0 || strcmp(el->name, pAttrName) != 0) {
+                
+                /* It's a new attr_name.  Allocate another message element */
+                if ((el = talloc_realloc(msg,
+                                         msg->elements,
+                                         struct ldb_message_element, 
+                                         msg->num_elements + 1)) == NULL) {
+                        return -1;
+                }
+                
+                /* Save the new element */
+                msg->elements = el;
+                
+                /* There's now one additional element */
+                msg->num_elements++;
+                
+                /* Save the attribute name */
+                if ((el->name =
+                     talloc_strdup(msg->elements, pAttrName)) == NULL) {
+                        
+                        return -1;
+                }
+                
+                /* No flags */
+                el->flags = 0;
+                
+                /* Initialize number of attribute values for this type */
+                el->num_values = 0;
+                el->values = NULL;
+        }
+        
+        /* Increase the value array size by 1 */
+        if ((el->values =
+             talloc_realloc(el,
+                            el->num_values == 0 ? NULL : el->values,
+                            struct ldb_val,
+                            el->num_values)) == NULL) {
+                return -1;
+        }
+        
+        /* Save the new attribute value length */
+        el->values[el->num_values].length = strlen(pAttrValue) + 1;
+        
+        /* Copy the new attribute value */
+        if (talloc_memdup(el->values[el->num_values].data,
+                          pAttrValue,
+                          el->values[el->num_values].length) == NULL) {
+                return -1;
+        }
+        
+        /* We now have one additional value of this type */
+        el->num_values++;
+        
        return 0;
 }
-#endif
 
 static char *
 parsetree_to_sql(struct ldb_module *module,
-                          char * hTalloc,
-                          const struct ldb_parse_tree *t)
+                 char * hTalloc,
+                 const struct ldb_parse_tree *t)
 {
        int                     i;
        char *                  child;
         char *                  p;
        char *                  ret = NULL;
         char *                  pAttrName;
-       
-
+        
+        
        switch(t->operation) {
-               case LDB_OP_SIMPLE:
-                       break;
-
-               case LDB_OP_EXTENDED:
-#warning  "derrell, you'll need to work out how to handle bitops"
-                       return NULL;
-
-               case LDB_OP_AND:
-                       ret = parsetree_to_sql(module,
-                                               hTalloc,
-                                               t->u.list.elements[0]);
-
-                       for (i = 1; i < t->u.list.num_elements; i++) {
-                               child =
-                                        parsetree_to_sql(
-                                                module,
-                                                hTalloc,
-                                                t->u.list.elements[i]);
-                               ret = talloc_asprintf_append(ret,
-                                                             "INTERSECT\n"
-                                                             "%s\n",
-                                                             child);
-                               talloc_free(child);
-                       }
-
-                        child = ret;
-                        ret = talloc_asprintf("(\n"
-                                              "%s\n"
-                                              ")\n",
-                                              child);
-                        talloc_free(child);
-                       return ret;
-
-               case LDB_OP_OR:
-                       child =
+        case LDB_OP_SIMPLE:
+                break;
+                
+        case LDB_OP_EXTENDED:
+#warning  "work out how to handle bitops"
+                return NULL;
+
+        case LDB_OP_AND:
+                ret = parsetree_to_sql(module,
+                                       hTalloc,
+                                       t->u.list.elements[0]);
+                
+                for (i = 1; i < t->u.list.num_elements; i++) {
+                        child =
                                 parsetree_to_sql(
                                         module,
                                         hTalloc,
-                                        t->u.list.elements[0]);
-
-                       for (i = 1; i < t->u.list.num_elements; i++) {
-                               child =
-                                        parsetree_to_sql(
-                                                module,
-                                                hTalloc,
-                                                t->u.list.elements[i]);
-                               ret = talloc_asprintf_append(ret,
-                                                             "UNION\n"
-                                                             "%s\n",
-                                                             child);
-                               talloc_free(child);
-                       }
-                        child = ret;
-                        ret = talloc_asprintf("(\n"
-                                              "%s\n"
-                                              ")\n",
-                                              child);
+                                        t->u.list.elements[i]);
+                        ret = talloc_asprintf_append(ret,
+                                                     "INTERSECT\n"
+                                                     "%s\n",
+                                                     child);
                         talloc_free(child);
-                       return ret;
-
-               case LDB_OP_NOT:
-                       child =
+                }
+                
+                child = ret;
+                ret = talloc_asprintf("(\n"
+                                      "%s\n"
+                                      ")\n",
+                                      child);
+                talloc_free(child);
+                return ret;
+                
+        case LDB_OP_OR:
+                child =
+                        parsetree_to_sql(
+                                module,
+                                hTalloc,
+                                t->u.list.elements[0]);
+                
+                for (i = 1; i < t->u.list.num_elements; i++) {
+                        child =
                                 parsetree_to_sql(
                                         module,
                                         hTalloc,
-                                        t->u.not.child);
-                       ret = talloc_asprintf(hTalloc,
-                                              "(\n"
-                                              "  SELECT eid\n"
-                                              "    FROM ldb_entry\n"
-                                              "    WHERE eid NOT IN %s\n"
-                                              ")\n",
-                                              child);
-                       talloc_free(child);
-                       return ret;
-
-               default:
-                        /* should never occur */
-                       abort();
+                                        t->u.list.elements[i]);
+                        ret = talloc_asprintf_append(ret,
+                                                     "UNION\n"
+                                                     "%s\n",
+                                                     child);
+                        talloc_free(child);
+                }
+                child = ret;
+                ret = talloc_asprintf("(\n"
+                                      "%s\n"
+                                      ")\n",
+                                      child);
+                talloc_free(child);
+                return ret;
+                
+        case LDB_OP_NOT:
+                child =
+                        parsetree_to_sql(
+                                module,
+                                hTalloc,
+                                t->u.not.child);
+                ret = talloc_asprintf(hTalloc,
+                                      "(\n"
+                                      "  SELECT eid\n"
+                                      "    FROM ldb_entry\n"
+                                      "    WHERE eid NOT IN %s\n"
+                                      ")\n",
+                                      child);
+                talloc_free(child);
+                return ret;
+                
+        default:
+                /* should never occur */
+                abort();
        };
-       
+        
         /* Get a case-folded copy of the attribute name */
         pAttrName = ldb_casefold((struct ldb_context *) module,
                                  t->u.simple.attr);
-
+        
         /*
          * For simple searches, we want to retrieve the list of EIDs that
          * match the criteria.  We accomplish this by searching the
@@ -1219,10 +1431,10 @@ parsetree_to_sql(struct ldb_module *module,
                                          pAttrName)) == NULL) {
                         return NULL;
                 }
-
+                
                 ret = talloc_strdup(hTalloc, p);
                 sqlite3_free(p);
-
+                
        } else if (strcasecmp(t->u.simple.attr, "objectclass") == 0) {
                /*
                  * For object classes, we want to search for all objectclasses
@@ -1243,10 +1455,10 @@ parsetree_to_sql(struct ldb_module *module,
                              t->u.simple.value.data)) == NULL) {
                         return NULL;
                 }
-
+                
                 ret = talloc_strdup(hTalloc, p);
                 sqlite3_free(p);
-
+                
        } else {
                 /* A normal query. */
                 if ((p = sqlite3_mprintf("(\n"
@@ -1258,7 +1470,7 @@ parsetree_to_sql(struct ldb_module *module,
                                          t->u.simple.value.data)) == NULL) {
                         return NULL;
                 }
-
+                
                 ret = talloc_strdup(hTalloc, p);
                 sqlite3_free(p);
        }
@@ -1266,13 +1478,177 @@ parsetree_to_sql(struct ldb_module *module,
 }
 
 
+static int
+parsetree_to_attrlist(struct lsqlite3_private * lsqlite3,
+                      const struct ldb_parse_tree * t)
+{
+       int                         i;
+        
+       switch(t->operation) {
+        case LDB_OP_SIMPLE:
+                break;
+                
+        case LDB_OP_EXTENDED:
+#warning  "work out how to handle bitops"
+                return -1;
+
+        case LDB_OP_AND:
+                if (parsetree_to_attrlist(
+                            lsqlite3,
+                            t->u.list.elements[0]) != 0) {
+                        return -1;
+                }
+                
+                for (i = 1; i < t->u.list.num_elements; i++) {
+                        if (parsetree_to_attrlist(
+                                    lsqlite3,
+                                    t->u.list.elements[i]) != 0) {
+                                return -1;
+                        }
+                }
+                
+                return 0;
+                
+        case LDB_OP_OR:
+                if (parsetree_to_attrlist(
+                            lsqlite3,
+                            t->u.list.elements[0]) != 0) {
+                        return -1;
+                }
+                
+                for (i = 1; i < t->u.list.num_elements; i++) {
+                        if (parsetree_to_attrlist(
+                                    lsqlite3,
+                                    t->u.list.elements[i]) != 0) {
+                                return -1;
+                        }
+                }
+                
+                return 0;
+                
+        case LDB_OP_NOT:
+                if (parsetree_to_attrlist(lsqlite3,
+                                          t->u.not.child) != 0) {
+                        return -1;
+                }
+                
+                return 0;
+                
+        default:
+                /* should never occur */
+                abort();
+       };
+        
+        QUERY_NOROWS(lsqlite3,
+                     FALSE,
+                     "INSERT OR IGNORE INTO " FILTER_ATTR_TABLE " "
+                     "    (attr_name) "
+                     "  VALUES "
+                     "    (%Q);",
+                     t->u.simple.attr);
+       return 0;
+}
+
+
+/*
+ * Use the already-generated FILTER_ATTR_TABLE to create a list of attribute
+ * table names that will be used in search queries.
+ */
 static char *
-parsetree_to_tablelist(struct ldb_module *module,
-                                char * hTalloc,
-                                const struct ldb_parse_tree *t)
+build_attr_table_list(void * hTalloc,
+                      struct lsqlite3_private * lsqlite3)
 {
-#warning "obtain talloc'ed array of attribute names for table list"
-        return NULL;
+        int             ret;
+        int             bLoop;
+        char *          p;
+        char *          pTableList;
+        sqlite3_stmt *  pStmt;
+        
+        /*
+         * Prepare and execute the SQL statement.  Loop allows retrying on
+         * certain errors, e.g. SQLITE_SCHEMA occurs if the schema changes,
+         * requiring retrying the operation.
+         */
+        for (bLoop = TRUE; bLoop; ) {
+                /* Initialize a string to which we'll append each table name */
+                if ((pTableList = talloc_strdup(hTalloc, "")) == NULL) {
+                        return NULL;
+                }
+                
+                /* Compile the SQL statement into sqlite virtual machine */
+                if ((ret = sqlite3_prepare(lsqlite3->sqlite,
+                                           "SELECT attr_name "
+                                           "  FROM " FILTER_ATTR_TABLE ";",
+                                           -1,
+                                           &pStmt,
+                                           NULL)) == SQLITE_SCHEMA) {
+                        continue;
+                } else if (ret != SQLITE_OK) {
+                        ret = -1;
+                        break;
+                }
+                
+                /* Loop through the returned rows */
+                for (ret = SQLITE_ROW; ret == SQLITE_ROW; ) {
+                        
+                        /* Get the next row */
+                        if ((ret = sqlite3_step(pStmt)) == SQLITE_ROW) {
+                                
+                                /*
+                                 * Get value from this row and append to table
+                                 * list
+                                 */
+                                p = discard_const_p(char,
+                                                    sqlite3_column_text(pStmt,
+                                                                        0));
+                                
+                                /* Append it to the table list */
+                                if ((p = talloc_asprintf(
+                                             hTalloc,
+                                             "%s%s",
+                                             *pTableList == '\0' ? "" : ",",
+                                             sqlite3_column_text(pStmt,
+                                                                 0))) == NULL) {
+                                        
+                                        talloc_free(pTableList);
+                                        return NULL;
+                                }
+                                
+                                /* We have a new table list */
+                                talloc_free(pTableList);
+                                pTableList = p;
+                        }
+                }
+                
+                if (ret == SQLITE_SCHEMA) {
+                        talloc_free(pTableList);
+                        continue;
+                }
+                
+                /* Free the virtual machine */
+                if ((ret = sqlite3_finalize(pStmt)) == SQLITE_SCHEMA) {
+                        (void) sqlite3_finalize(pStmt);
+                        continue;
+                } else if (ret != SQLITE_OK) {
+                        (void) sqlite3_finalize(pStmt);
+                        ret = -1;
+                        break;
+                }
+                
+                /*
+                 * Normal condition is only one time through loop.  Loop is
+                 * rerun in error conditions, via "continue", above.
+                 */
+                ret = 0;
+                bLoop = FALSE;
+        }
+
+        if (ret != 0) {
+                talloc_free(pTableList);
+                pTableList = NULL;
+        }
+
+        return pTableList;
 }
 
 
@@ -1291,30 +1667,30 @@ msg_to_sql(struct ldb_module * module,
        unsigned int                i;
         unsigned int                j;
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-
+        
        for (i = 0; i < msg->num_elements; i++) {
                const struct ldb_message_element *el = &msg->elements[i];
-
+                
                 if (! use_flags) {
                         flags = LDB_FLAG_MOD_ADD;
                 } else {
                         flags = el->flags & LDB_FLAG_MOD_MASK;
                 }
-
+                
                 /* Get a case-folded copy of the attribute name */
                 pAttrName = ldb_casefold((struct ldb_context *) module,
                                          el->name);
-
+                
                 if (flags == LDB_FLAG_MOD_ADD) {
                         /* Create the attribute table if it doesn't exist */
                         if (new_attr(module, pAttrName) != 0) {
                                 return -1;
                         }
                 }
-
+                
                 /* For each value of the specified attribute name... */
                for (j = 0; j < el->num_values; j++) {
-
+                        
                         /* ... bind the attribute value, if necessary */
                         switch (flags) {
                         case LDB_FLAG_MOD_ADD:
@@ -1335,9 +1711,9 @@ msg_to_sql(struct ldb_module * module,
                                              "  WHERE eid = %lld;",
                                              el->name, el->values[j].data,
                                              eid);
-                                      
+                                
                                 break;
-
+                                
                         case LDB_FLAG_MOD_REPLACE:
                                 QUERY_NOROWS(lsqlite3,
                                              FALSE,
@@ -1357,7 +1733,7 @@ msg_to_sql(struct ldb_module * module,
                                              el->name, el->values[j].data,
                                              eid);
                                 break;
-
+                                
                         case LDB_FLAG_MOD_DELETE:
                                 /* No additional parameters to this query */
                                 QUERY_NOROWS(lsqlite3,
@@ -1381,7 +1757,7 @@ msg_to_sql(struct ldb_module * module,
                         }
                }
        }
-
+        
        return 0;
 }
 
@@ -1401,7 +1777,7 @@ new_dn(struct ldb_module * module,
         struct ldb_dn_component *   pComponent;
        struct ldb_context *        ldb = module->ldb;
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-
+        
         /* Explode and normalize the DN */
         if ((pExplodedDN =
              ldb_explode_dn(ldb,
@@ -1410,12 +1786,12 @@ new_dn(struct ldb_module * module,
                             case_fold_attr_required)) == NULL) {
                 return -1;
         }
-
+        
         /* Allocate a string to hold the partial DN of each component */
         if ((pPartialDN = talloc_strdup(ldb, "")) == NULL) {
                 return -1;
         }
-
+        
         /* For each component of the DN (starting with the last one)... */
         eid = 0;
         for (nComponent = pExplodedDN->comp_num - 1, bFirst = TRUE;
@@ -1424,7 +1800,7 @@ new_dn(struct ldb_module * module,
                 
                 /* Point to the component */
                 pComponent = pExplodedDN->components[nComponent];
-
+                
                 /* Add this component on to the partial DN to date */
                 if ((p = talloc_asprintf(ldb,
                                          "%s%s%s",
@@ -1433,13 +1809,13 @@ new_dn(struct ldb_module * module,
                                          pPartialDN)) == NULL) {
                         return -1;
                 }
-
+                
                 /* No need for the old partial DN any more */
                 talloc_free(pPartialDN);
-
+                
                 /* Save the new partial DN */
                 pPartialDN = p;
-
+                
                 /*
                  * Ensure that an entry is in the ldb_entry table for this
                  * component.  Any component other than the last one
@@ -1467,30 +1843,30 @@ new_dn(struct ldb_module * module,
                                      nComponent == 0 ? "" : "OR IGNORE",
                                      eid, pPartialDN);
                 }
-
+                
                 /* Get the EID of the just inserted row (the next parent) */
                 eid = sqlite3_last_insert_rowid(lsqlite3->sqlite);
         }
-
+        
         /* Give 'em what they came for! */
         *pEID = eid;
-
+        
         return 0;
 }
 
 
 static int
 new_attr(struct ldb_module * module,
-                  char * pAttrName)
+         char * pAttrName)
 {
         long long                   bExists;
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-
+        
         /*
          * NOTE:
          *   pAttrName is assumed to already be case-folded here!
          */
-
+        
         /* See if the table already exists */
         QUERY_INT(lsqlite3,
                   bExists,
@@ -1500,7 +1876,7 @@ new_attr(struct ldb_module * module,
                   "  WHERE type = 'table' "
                   "    AND tbl_name = %Q;",
                   pAttrName);
-
+        
         /* Did it exist? */
         if (! bExists) {
                 /* Nope.  Create the table */
@@ -1513,7 +1889,7 @@ new_attr(struct ldb_module * module,
                              ");",
                              pAttrName);
         }
-
+        
         return 0;
 }