r13609: Get in the initial work on making ldb async
[kai/samba.git] / source / lib / ldb / ldb_sqlite3 / ldb_sqlite3.c
index e283e6b0c15b7249e392e6f13deef883250c4ec1..48d849746fe2479f8d0f5955d6a4d33e7d12f4da 100644 (file)
@@ -2,6 +2,7 @@
    ldb database library
    
    Copyright (C) Derrell Lipman  2005
+   Copyright (C) Simo Sorce 2005
    
    ** NOTE! The following LGPL license applies to the ldb
    ** library. This does NOT imply that all of Samba is released
  *  Author: Derrell Lipman (based on Andrew Tridgell's LDAP backend)
  */
 
-#include <stdarg.h>
 #include "includes.h"
-#include "ldb/include/ldb.h"
-#include "ldb/include/ldb_private.h"
+#include "ldb/include/includes.h"
+
 #include "ldb/ldb_sqlite3/ldb_sqlite3.h"
 
 /*
@@ -236,6 +236,7 @@ static char *parsetree_to_sql(struct ldb_module *module,
        char *wild_card_string;
        char *child, *tmp;
        char *ret = NULL;
+       char *attr;
        int i;
 
 
@@ -250,7 +251,7 @@ static char *parsetree_to_sql(struct ldb_module *module,
                        child = parsetree_to_sql(module, mem_ctx, t->u.list.elements[i]);
                        if (child == NULL) return NULL;
 
-                       tmp = talloc_asprintf_append(tmp, "INTERSECT %s ", child);
+                       tmp = talloc_asprintf_append(tmp, " INTERSECT %s ", child);
                        if (tmp == NULL) return NULL;
                }
 
@@ -268,11 +269,11 @@ static char *parsetree_to_sql(struct ldb_module *module,
                        child = parsetree_to_sql(module, mem_ctx, t->u.list.elements[i]);
                        if (child == NULL) return NULL;
 
-                       tmp = talloc_asprintf_append(tmp, "UNION %s ", child);
+                       tmp = talloc_asprintf_append(tmp, " UNION %s ", child);
                        if (tmp == NULL) return NULL;
                }
 
-               return talloc_asprintf(mem_ctx, "SELECT * FROM ( %s )", tmp);
+               return talloc_asprintf(mem_ctx, "SELECT * FROM ( %s ) ", tmp);
 
        case LDB_OP_NOT:
 
@@ -281,14 +282,16 @@ static char *parsetree_to_sql(struct ldb_module *module,
 
                return talloc_asprintf(mem_ctx,
                                        "SELECT eid FROM ldb_entry "
-                                       "WHERE eid NOT IN ( %s )", child);
+                                       "WHERE eid NOT IN ( %s ) ", child);
 
        case LDB_OP_EQUALITY:
                /*
                 * For simple searches, we want to retrieve the list of EIDs that
                 * match the criteria.
                */
-               h = ldb_attrib_handler(module->ldb, t->u.equality.attr);
+               attr = ldb_attr_casefold(module->ldb, mem_ctx, t->u.equality.attr);
+               if (attr == NULL) return NULL;
+               h = ldb_attrib_handler(module->ldb, attr);
 
                /* Get a canonicalised copy of the data */
                h->canonicalise_fn(module->ldb, mem_ctx, &(t->u.equality.value), &value);
@@ -326,9 +329,9 @@ static char *parsetree_to_sql(struct ldb_module *module,
                        /* A normal query. */
                        return lsqlite3_tprintf(mem_ctx,
                                                "SELECT eid FROM ldb_attribute_values "
-                                               "WHERE norm_attr_name = upper('%q') "
+                                               "WHERE norm_attr_name = '%q' "
                                                "AND norm_attr_value = '%q'",
-                                               t->u.equality.attr,
+                                               attr,
                                                value.data);
 
                }
@@ -341,7 +344,7 @@ static char *parsetree_to_sql(struct ldb_module *module,
 
                for (i = 0; t->u.substring.chunks[i]; i++) {
                        wild_card_string = talloc_asprintf_append(wild_card_string, "%s*",
-                                                       t->u.substring.chunks[i]);
+                                                       t->u.substring.chunks[i]->data);
                        if (wild_card_string == NULL) return NULL;
                }
 
@@ -350,7 +353,9 @@ static char *parsetree_to_sql(struct ldb_module *module,
                        wild_card_string[strlen(wild_card_string) - 1] = '\0';
                }
 
-               h = ldb_attrib_handler(module->ldb, t->u.substring.attr);
+               attr = ldb_attr_casefold(module->ldb, mem_ctx, t->u.substring.attr);
+               if (attr == NULL) return NULL;
+               h = ldb_attrib_handler(module->ldb, attr);
 
                subval.data = wild_card_string;
                subval.length = strlen(wild_card_string) + 1;
@@ -363,13 +368,15 @@ static char *parsetree_to_sql(struct ldb_module *module,
 
                return lsqlite3_tprintf(mem_ctx,
                                        "SELECT eid FROM ldb_attribute_values "
-                                       "WHERE norm_attr_name = upper('%q') "
+                                       "WHERE norm_attr_name = '%q' "
                                        "AND norm_attr_value GLOB '%q'",
-                                       t->u.substring.attr,
+                                       attr,
                                        value.data);
 
        case LDB_OP_GREATER:
-               h = ldb_attrib_handler(module->ldb, t->u.equality.attr);
+               attr = ldb_attr_casefold(module->ldb, mem_ctx, t->u.equality.attr);
+               if (attr == NULL) return NULL;
+               h = ldb_attrib_handler(module->ldb, attr);
 
                /* Get a canonicalised copy of the data */
                h->canonicalise_fn(module->ldb, mem_ctx, &(t->u.equality.value), &value);
@@ -379,13 +386,16 @@ static char *parsetree_to_sql(struct ldb_module *module,
 
                return lsqlite3_tprintf(mem_ctx,
                                        "SELECT eid FROM ldb_attribute_values "
-                                       "WHERE norm_attr_name = upper('%q') "
-                                       "AND norm_attr_value >= '%q'",
-                                       t->u.equality.attr,
-                                       value.data);
+                                       "WHERE norm_attr_name = '%q' "
+                                       "AND ldap_compare(norm_attr_value, '>=', '%q', '%q') ",
+                                       attr,
+                                       value.data,
+                                       attr);
 
        case LDB_OP_LESS:
-               h = ldb_attrib_handler(module->ldb, t->u.equality.attr);
+               attr = ldb_attr_casefold(module->ldb, mem_ctx, t->u.equality.attr);
+               if (attr == NULL) return NULL;
+               h = ldb_attrib_handler(module->ldb, attr);
 
                /* Get a canonicalised copy of the data */
                h->canonicalise_fn(module->ldb, mem_ctx, &(t->u.equality.value), &value);
@@ -395,23 +405,29 @@ static char *parsetree_to_sql(struct ldb_module *module,
 
                return lsqlite3_tprintf(mem_ctx,
                                        "SELECT eid FROM ldb_attribute_values "
-                                       "WHERE norm_attr_name = upper('%q') "
-                                       "AND norm_attr_value <= '%q'",
-                                       t->u.equality.attr,
-                                       value.data);
+                                       "WHERE norm_attr_name = '%q' "
+                                       "AND ldap_compare(norm_attr_value, '<=', '%q', '%q') ",
+                                       attr,
+                                       value.data,
+                                       attr);
 
        case LDB_OP_PRESENT:
-               if (strcasecmp(t->u.present.attr, "dn")) {
+               if (strcasecmp(t->u.present.attr, "dn") == 0) {
                        return talloc_strdup(mem_ctx, "SELECT eid FROM ldb_entry");
                }
 
+               attr = ldb_attr_casefold(module->ldb, mem_ctx, t->u.present.attr);
+               if (attr == NULL) return NULL;
+
                return lsqlite3_tprintf(mem_ctx,
                                        "SELECT eid FROM ldb_attribute_values "
-                                       "WHERE norm_attr_name = upper('%q')",
-                                       t->u.present.attr);
+                                       "WHERE norm_attr_name = '%q' ",
+                                       attr);
 
        case LDB_OP_APPROX:
-               h = ldb_attrib_handler(module->ldb, t->u.equality.attr);
+               attr = ldb_attr_casefold(module->ldb, mem_ctx, t->u.equality.attr);
+               if (attr == NULL) return NULL;
+               h = ldb_attrib_handler(module->ldb, attr);
 
                /* Get a canonicalised copy of the data */
                h->canonicalise_fn(module->ldb, mem_ctx, &(t->u.equality.value), &value);
@@ -421,10 +437,11 @@ static char *parsetree_to_sql(struct ldb_module *module,
 
                return lsqlite3_tprintf(mem_ctx,
                                        "SELECT eid FROM ldb_attribute_values "
-                                       "WHERE norm_attr_name = upper('%q') "
-                                       "AND norm_attr_value LIKE '%q'",
-                                       t->u.equality.attr,
-                                       value.data);
+                                       "WHERE norm_attr_name = '%q' "
+                                       "AND ldap_compare(norm_attr_value, '~%', 'q', '%q') ",
+                                       attr,
+                                       value.data,
+                                       attr);
                
        case LDB_OP_EXTENDED:
 #warning  "work out how to handle bitops"
@@ -439,153 +456,6 @@ static char *parsetree_to_sql(struct ldb_module *module,
        return NULL;
 }
 
-
-/*
- * query_norows()
- *
- * This function is used for queries that are not expected to return any rows,
- * e.g. BEGIN, COMMIT, ROLLBACK, CREATE TABLE, INSERT, UPDATE, DELETE, etc.
- * There are no provisions here for returning data from rows in a table, so do
- * not pass SELECT queries to this function.
- */
-static int
-query_norows(const struct lsqlite3_private *lsqlite3,
-             const char *pSql,
-             ...)
-{
-        int             ret;
-        int             bLoop;
-        char *          p;
-        sqlite3_stmt *  pStmt;
-        va_list         args;
-        /* Begin access to variable argument list */
-        va_start(args, pSql);
-        
-        /* Format the query */
-        if ((p = sqlite3_vmprintf(pSql, args)) == NULL) {
-                return -1;
-        }
-        
-        /*
-         * Prepare and execute the SQL statement.  Loop allows retrying on
-         * certain errors, e.g. SQLITE_SCHEMA occurs if the schema changes,
-         * requiring retrying the operation.
-         */
-        for (bLoop = TRUE; bLoop; ) {
-                
-                /* Compile the SQL statement into sqlite virtual machine */
-                if ((ret = sqlite3_prepare(lsqlite3->sqlite,
-                                           p,
-                                           -1,
-                                           &pStmt,
-                                           NULL)) == SQLITE_SCHEMA) {
-                        if (stmtGetEID != NULL) {
-                                sqlite3_finalize(stmtGetEID);
-                                stmtGetEID = NULL;
-                        }
-                        continue;
-                } else if (ret != SQLITE_OK) {
-                        ret = -1;
-                        break;
-                }
-                
-                /* No rows expected, so just step through machine code once */
-                if ((ret = sqlite3_step(pStmt)) == SQLITE_SCHEMA) {
-                        if (stmtGetEID != NULL) {
-                                sqlite3_finalize(stmtGetEID);
-                                stmtGetEID = NULL;
-                        }
-                        (void) sqlite3_finalize(pStmt);
-                        continue;
-                } else if (ret != SQLITE_DONE) {
-                        (void) sqlite3_finalize(pStmt);
-                        ret = -1;
-                        break;
-                }
-                
-                /* Free the virtual machine */
-                if ((ret = sqlite3_finalize(pStmt)) == SQLITE_SCHEMA) {
-                        if (stmtGetEID != NULL) {
-                                sqlite3_finalize(stmtGetEID);
-                                stmtGetEID = NULL;
-                        }
-                        continue;
-                } else if (ret != SQLITE_OK) {
-                        (void) sqlite3_finalize(pStmt);
-                        ret = -1;
-                        break;
-                }
-                
-                /*
-                 * Normal condition is only one time through loop.  Loop is
-                 * rerun in error conditions, via "continue", above.
-                 */
-                ret = 0;
-                bLoop = FALSE;
-        }
-        
-        /* All done with variable argument list */
-        va_end(args);
-        
-        /* Free the memory we allocated for our query string */
-        sqlite3_free(p);
-        
-        return ret;
-}
-
-
-/* obtain a named lock */
-static int
-lsqlite3_lock(struct ldb_module * module,
-              const char * lockname)
-{
-       struct lsqlite3_private *   lsqlite3 = module->private_data;
-
-/* FIXME
-       if (lockname == NULL) {
-               return -1;
-       }
-        
-        if (strcmp(lockname, "transaction") == 0) {
-                if (lsqlite3->lock_count == 0) {
-                        if (query_norows(lsqlite3, "BEGIN EXCLUSIVE;") != 0) {
-                                return -1;
-                        }
-                }
-                ++lsqlite3->lock_count;
-        }
-*/
-       return 0;
-}
-
-/* release a named lock */
-static int
-lsqlite3_unlock(struct ldb_module *module,
-                const char *lockname)
-{
-       struct lsqlite3_private *   lsqlite3 = module->private_data;
-
-/* FIXME
-       if (lockname == NULL) {
-               return -1;
-       }
-        
-        if (strcmp(lockname, "transaction") == 0) {
-                if (lsqlite3->lock_count == 1) {
-                        if (query_norows(lsqlite3, "COMMIT;") != 0) {
-                                query_norows(lsqlite3, "ROLLBACK;");
-                        }
-                } else if (lsqlite3->lock_count > 0) {
-                        --lsqlite3->lock_count;
-                }
-        } else if (strcmp(lockname, "rollback") == 0) {
-                query_norows(lsqlite3, "ROLLBACK;");
-        }
-*/
-        return 0;
-}
-
 /*
  * query_int()
  *
@@ -682,6 +552,76 @@ query_int(const struct lsqlite3_private * lsqlite3,
         return ret;
 }
 
+/*
+ * This is a bad hack to support ldap style comparisons whithin sqlite.
+ * val is the attribute in the row currently under test
+ * func is the desired test "<=" ">=" "~" ":"
+ * cmp is the value to compare against (eg: "test")
+ * attr is the attribute name the value of which we want to test
+ */
+
+static void lsqlite3_compare(sqlite3_context *ctx, int argc,
+                                       sqlite3_value **argv)
+{
+       struct ldb_context *ldb = (struct ldb_context *)sqlite3_user_data(ctx);
+       const unsigned char *val = sqlite3_value_text(argv[0]);
+       const unsigned char *func = sqlite3_value_text(argv[1]);
+       const unsigned char *cmp = sqlite3_value_text(argv[2]);
+       const unsigned char *attr = sqlite3_value_text(argv[3]);
+       const struct ldb_attrib_handler *h;
+       struct ldb_val valX;
+       struct ldb_val valY;
+       int ret;
+
+       switch (func[0]) {
+       /* greater */
+       case '>': /* >= */
+               h = ldb_attrib_handler(ldb, attr);
+               valX.data = cmp;
+               valX.length = strlen(cmp);
+               valY.data = val;
+               valY.length = strlen(val);
+               ret = h->comparison_fn(ldb, ldb, &valY, &valX);
+               if (ret >= 0)
+                       sqlite3_result_int(ctx, 1);
+               else
+                       sqlite3_result_int(ctx, 0);
+               return;
+
+       /* lesser */
+       case '<': /* <= */
+               h = ldb_attrib_handler(ldb, attr);
+               valX.data = cmp;
+               valX.length = strlen(cmp);
+               valY.data = val;
+               valY.length = strlen(val);
+               ret = h->comparison_fn(ldb, ldb, &valY, &valX);
+               if (ret <= 0)
+                       sqlite3_result_int(ctx, 1);
+               else
+                       sqlite3_result_int(ctx, 0);
+               return;
+
+       /* approx */
+       case '~':
+               /* TODO */
+               sqlite3_result_int(ctx, 0);
+               return;
+
+       /* bitops */
+       case ':':
+               /* TODO */
+               sqlite3_result_int(ctx, 0);
+               return;
+
+       default:
+               break;
+       }
+
+       sqlite3_result_error(ctx, "Value must start with a special operation char (<>~:)!", -1);
+       return;
+}
+
 
 /* rename a record */
 static int lsqlite3_safe_rollback(sqlite3 *sqlite)
@@ -693,7 +633,7 @@ static int lsqlite3_safe_rollback(sqlite3 *sqlite)
        ret = sqlite3_exec(sqlite, "ROLLBACK;", NULL, NULL, &errmsg);
        if (ret != SQLITE_OK) {
                if (errmsg) {
-                       printf("lsqlite3_safe_rollback: Serious Error: %s\n", errmsg);
+                       printf("lsqlite3_safe_rollback: Error: %s\n", errmsg);
                        free(errmsg);
                }
                return -1;
@@ -718,6 +658,7 @@ struct lsqlite3_msgs {
        int count;
        struct ldb_message **msgs;
        long long current_eid;
+       const char * const * attrs;
        TALLOC_CTX *mem_ctx;
 };
 
@@ -730,6 +671,7 @@ static int lsqlite3_search_callback(void *result, int col_num, char **cols, char
        struct lsqlite3_msgs *msgs = (struct lsqlite3_msgs *)result;
        struct ldb_message *msg;
        long long eid;
+       int i;
 
        /* eid, dn, attr_name, attr_value */
        if (col_num != 4) return SQLITE_ABORT;
@@ -762,26 +704,19 @@ static int lsqlite3_search_callback(void *result, int col_num, char **cols, char
                if (msg->dn == NULL) return SQLITE_ABORT;
        }
 
-       msg->elements = talloc_realloc(msg,
-                                      msg->elements,
-                                      struct ldb_message_element,
-                                      msg->num_elements + 1);
-       if (msg->elements == NULL) return SQLITE_ABORT;
-
-       msg->elements[msg->num_elements].flags = 0;
-       msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, cols[2]);
-       if (msg->elements[msg->num_elements].name == NULL) return SQLITE_ABORT;
-
-       msg->elements[msg->num_elements].num_values = 1;
-       msg->elements[msg->num_elements].values = talloc_array(msg->elements,
-                                                               struct ldb_val, 1);
-       if (msg->elements[msg->num_elements].values == NULL) return SQLITE_ABORT;
-
-       msg->elements[msg->num_elements].values[0].length = strlen(cols[3]);
-       msg->elements[msg->num_elements].values[0].data = talloc_strdup(msg->elements, cols[3]);
-       if (msg->elements[msg->num_elements].values[0].data == NULL) return SQLITE_ABORT;
+       if (msgs->attrs) {
+               int found = 0;
+               for (i = 0; msgs->attrs[i]; i++) {
+                       if (strcasecmp(cols[2], msgs->attrs[i]) == 0) {
+                               found = 1;
+                               break;
+                       }
+               }
+               if (!found) return 0;
+       }
 
-       msg->num_elements++;
+       if (ldb_msg_add_string(msg, cols[2], cols[3]) != 0)
+               return SQLITE_ABORT;
 
        return SQLITE_OK;
 }
@@ -855,69 +790,40 @@ done:
 /* search for matching records, by tree */
 static int lsqlite3_search_bytree(struct ldb_module * module, const struct ldb_dn* basedn,
                                  enum ldb_scope scope, struct ldb_parse_tree * tree,
-                                 const char * const * attrs, struct ldb_message *** res)
+                                 const char * const * attrs, struct ldb_result ** res)
 {
        TALLOC_CTX *local_ctx;
        struct lsqlite3_private *lsqlite3 = module->private_data;
        struct lsqlite3_msgs msgs;
        char *norm_basedn;
-        char *attr_list;
        char *sqlfilter;
        char *errmsg;
-       char *query;
+       char *query = NULL;
         int ret, i;
 
        /* create a local ctx */
-       local_ctx = talloc_named(lsqlite3, 0, "lsqlite3_search_by_tree local context");
+       local_ctx = talloc_named(lsqlite3, 0, "lsqlite3_search_bytree local context");
        if (local_ctx == NULL) {
                return -1;
        }
 
        if (basedn) {
                norm_basedn = ldb_dn_linearize(local_ctx, ldb_dn_casefold(module->ldb, basedn));
-               if (norm_basedn == NULL) goto failed;
+               if (norm_basedn == NULL) {
+                       ret = LDB_ERR_INVALID_DN_SYNTAX;
+                       goto failed;
+               }
        } else norm_basedn = talloc_strdup(local_ctx, "");
 
        if (*norm_basedn == '\0' &&
-               (scope == LDB_SCOPE_BASE || scope == LDB_SCOPE_ONELEVEL))
+               (scope == LDB_SCOPE_BASE || scope == LDB_SCOPE_ONELEVEL)) {
+                       ret = LDB_ERR_UNWILLING_TO_PERFORM;
                        goto failed;
+               }
 
         /* Convert filter into a series of SQL conditions (constraints) */
        sqlfilter = parsetree_to_sql(module, local_ctx, tree);
         
-        /* Initially, we don't know what the requested attributes are */
-       if (attrs != NULL) {
-               attr_list = talloc_strdup(local_ctx, "AND norm_attr_name IN (");
-               if (attr_list == NULL) goto failed;
-
-               for (i = 0; attrs[i]; i++) {
-                       char *norm_attr_name;
-
-                       /* If any attribute in the list is "*" then... */
-                       if (strcmp(attrs[i], "*") == 0) {
-                               /* we want all attribute types */
-                               attr_list = talloc_strdup(local_ctx, "");
-                               if (attr_list == NULL) goto failed;
-                               break;
-                       }
-
-                       norm_attr_name = ldb_casefold(local_ctx, attrs[i]);
-                       if (norm_attr_name == NULL) goto failed;
-
-                       attr_list = talloc_asprintf_append(attr_list, "'%q', ",
-                                                          norm_attr_name);
-                       if (attr_list == NULL) goto failed;
-
-               }
-
-               /* substitute the last ',' with ')' */
-               attr_list[strlen(attr_list)-2] = ')';
-
-       } else {
-               attr_list = talloc_strdup(local_ctx, "");
-               if (attr_list == NULL) goto failed;
-       }
-
         switch(scope) {
         case LDB_SCOPE_DEFAULT:
         case LDB_SCOPE_SUBTREE:
@@ -941,13 +847,10 @@ static int lsqlite3_search_bytree(struct ldb_module * module, const struct ldb_d
                                "         (%s)\n"
                                "    )\n"
 
-                               "    %s\n"
-
                                "  ORDER BY entry.eid ASC;",
                                norm_basedn,
                                norm_basedn,
-                               sqlfilter,
-                               attr_list);
+                               sqlfilter);
                } else {
                        query = lsqlite3_tprintf(local_ctx,
                                "SELECT entry.eid,\n"
@@ -966,11 +869,8 @@ static int lsqlite3_search_bytree(struct ldb_module * module, const struct ldb_d
                                "         (%s)\n"
                                "    )\n"
 
-                               "    %s\n"
-
                                "  ORDER BY entry.eid ASC;",
-                               sqlfilter,
-                               attr_list);
+                               sqlfilter);
                }
 
                break;
@@ -994,12 +894,9 @@ static int lsqlite3_search_bytree(struct ldb_module * module, const struct ldb_d
                        "           (%s)\n"
                         "    )\n"
 
-                        "    %s\n"
-
                         "  ORDER BY entry.eid ASC;",
                        norm_basedn,
-                        sqlfilter,
-                       attr_list);
+                        sqlfilter);
                 break;
                 
         case LDB_SCOPE_ONELEVEL:
@@ -1021,89 +918,62 @@ static int lsqlite3_search_bytree(struct ldb_module * module, const struct ldb_d
                         "         AND ldb_entry.eid IN\n(%s)\n"
                         "    )\n"
 
-                        "    %s\n"
-
                         "  ORDER BY entry.eid ASC;",
                         norm_basedn,
                         norm_basedn,
-                        sqlfilter,
-                       attr_list);
+                        sqlfilter);
                 break;
         }
 
         if (query == NULL) {
-                ret = -1;
+                ret = LDB_ERR_OTHER;
                 goto failed;
         }
 
-       /* printf ("%s\n", query); */
+       /* * /
+       printf ("%s\n", query);
+       / * */
 
        msgs.msgs = NULL;
        msgs.count = 0;
        msgs.current_eid = 0;
        msgs.mem_ctx = local_ctx;
+       msgs.attrs = attrs;
 
        ret = sqlite3_exec(lsqlite3->sqlite, query, lsqlite3_search_callback, &msgs, &errmsg);
        if (ret != SQLITE_OK) {
                if (errmsg) {
-                       printf("lsqlite3_search_bytree: Fatal Error: %s\n", errmsg);
+                       ldb_set_errstring(module, talloc_strdup(module, errmsg));
                        free(errmsg);
                }
+               ret = LDB_ERR_OTHER;
                goto failed;
        }
 
        for (i = 0; i < msgs.count; i++) {
                msgs.msgs[i] = ldb_msg_canonicalize(module->ldb, msgs.msgs[i]);
-               if (msgs.msgs[i] ==  NULL) goto failed;
+               if (msgs.msgs[i] ==  NULL) {
+                       goto failed;
+               }
        }
 
-       *res = talloc_steal(module, msgs.msgs);
-       ret = msgs.count;
+       *res = talloc(module, struct ldb_result);
+       if (! *res) {
+               goto failed;
+       }
+
+       (*res)->msgs = talloc_steal(*res, msgs.msgs);
+       (*res)->count = msgs.count;
+       (*res)->refs = NULL;
+       (*res)->controls = NULL;
 
        talloc_free(local_ctx);
-       return ret;
+       return LDB_SUCCESS;
 
 /* If error, return error code; otherwise return number of results */
 failed:
         talloc_free(local_ctx);
-       return -1;
-}
-
-/* search for matching records, by expression */
-static int lsqlite3_search(struct ldb_module * module, const struct ldb_dn *basedn,
-                          enum ldb_scope scope, const char * expression,
-                          const char * const *attrs, struct ldb_message *** res)
-{
-        struct ldb_parse_tree * tree;
-        int ret;
-        
-        /* Handle tdb specials */
-        if (ldb_dn_is_special(basedn)) {
-#warning "handle tdb specials"
-                return 0;
-        }
-
-#if 0 
-/* (|(objectclass=*)(dn=*)) is  passed by the command line tool now instead */
-        /* Handle the special case of requesting all */
-        if (pExpression != NULL && *pExpression == '\0') {
-                pExpression = "dn=*";
-        }
-#endif
-
-        /* Parse the filter expression into a tree we can work with */
-       if ((tree = ldb_parse_tree(module->ldb, expression)) == NULL) {
-                return -1;
-       }
-        
-        /* Now use the bytree function for the remainder of processing */
-        ret = lsqlite3_search_bytree(module, basedn, scope, tree, attrs, res);
-        
-        /* Free the parse tree */
-       talloc_free(tree);
-        
-        /* All done. */
-        return ret;
+       return LDB_ERR_OTHER;
 }
 
 
@@ -1116,14 +986,13 @@ static int lsqlite3_add(struct ldb_module *module, const struct ldb_message *msg
        char *dn, *ndn;
        char *errmsg;
        char *query;
-       int rollback = 0;
        int ret;
        int i;
         
        /* create a local ctx */
        local_ctx = talloc_named(lsqlite3, 0, "lsqlite3_add local context");
        if (local_ctx == NULL) {
-               return -1;
+               return LDB_ERR_OTHER;
        }
 
         /* See if this is an ltdb special */
@@ -1133,47 +1002,55 @@ static int lsqlite3_add(struct ldb_module *module, const struct ldb_message *msg
                c = ldb_dn_explode(local_ctx, "@SUBCLASSES");
                if (ldb_dn_compare(module->ldb, msg->dn, c) == 0) {
 #warning "insert subclasses into object class tree"
+                       ret = LDB_ERR_UNWILLING_TO_PERFORM;
                        goto failed;
                }
 
+/*
                c = ldb_dn_explode(local_ctx, "@INDEXLIST");
                if (ldb_dn_compare(module->ldb, msg->dn, c) == 0) {
 #warning "should we handle indexes somehow ?"
                        goto failed;
                }
-
+*/
                 /* Others are implicitly ignored */
-                return 0;
+                return LDB_SUCCESS;
        }
 
        /* create linearized and normalized dns */
        dn = ldb_dn_linearize(local_ctx, msg->dn);
        ndn = ldb_dn_linearize(local_ctx, ldb_dn_casefold(module->ldb, msg->dn));
-       if (dn == NULL || ndn == NULL) goto failed;
+       if (dn == NULL || ndn == NULL) {
+               ret = LDB_ERR_OTHER;
+               goto failed;
+       }
 
        query = lsqlite3_tprintf(local_ctx,
-                                  /* Begin the transaction */
-                                  "BEGIN EXCLUSIVE; "
                                   /* Add new entry */
                                   "INSERT OR ABORT INTO ldb_entry "
                                   "('dn', 'norm_dn') "
                                   "VALUES ('%q', '%q');",
                                dn, ndn);
-       if (query == NULL) goto failed;
+       if (query == NULL) {
+               ret = LDB_ERR_OTHER;
+               goto failed;
+       }
 
        ret = sqlite3_exec(lsqlite3->sqlite, query, NULL, NULL, &errmsg);
        if (ret != SQLITE_OK) {
                if (errmsg) {
-                       printf("lsqlite3_add: exec error: %s\n", errmsg);
+                       ldb_set_errstring(module, talloc_strdup(module, errmsg));
                        free(errmsg);
                }
-               lsqlite3_safe_rollback(lsqlite3->sqlite); 
+               ret = LDB_ERR_OTHER;
                goto failed;
        }
-       rollback = 1;
 
        eid = lsqlite3_get_eid_ndn(lsqlite3->sqlite, local_ctx, ndn);
-       if (eid == -1) goto failed;
+       if (eid == -1) {
+               ret = LDB_ERR_OTHER;
+               goto failed;
+       }
 
        for (i = 0; i < msg->num_elements; i++) {
                const struct ldb_message_element *el = &msg->elements[i];
@@ -1182,8 +1059,11 @@ static int lsqlite3_add(struct ldb_module *module, const struct ldb_message *msg
                int j;
 
                /* Get a case-folded copy of the attribute name */
-               attr = ldb_casefold(local_ctx, el->name);
-               if (attr == NULL) goto failed;
+               attr = ldb_attr_casefold(module->ldb, local_ctx, el->name);
+               if (attr == NULL) {
+                       ret = LDB_ERR_OTHER;
+                       goto failed;
+               }
 
                h = ldb_attrib_handler(module->ldb, el->name);
 
@@ -1194,7 +1074,10 @@ static int lsqlite3_add(struct ldb_module *module, const struct ldb_message *msg
 
                        /* Get a canonicalised copy of the data */
                        h->canonicalise_fn(module->ldb, local_ctx, &(el->values[j]), &value);
-                       if (value.data == NULL) goto failed;
+                       if (value.data == NULL) {
+                               ret = LDB_ERR_OTHER;
+                               goto failed;
+                       }
 
                        insert = lsqlite3_tprintf(local_ctx,
                                        "INSERT OR ROLLBACK INTO ldb_attribute_values "
@@ -1203,35 +1086,29 @@ static int lsqlite3_add(struct ldb_module *module, const struct ldb_message *msg
                                        "VALUES ('%lld', '%q', '%q', '%q', '%q');",
                                        eid, el->name, attr,
                                        el->values[j].data, value.data);
-                       if (insert == NULL) goto failed;
+                       if (insert == NULL) {
+                               ret = LDB_ERR_OTHER;
+                               goto failed;
+                       }
 
                        ret = sqlite3_exec(lsqlite3->sqlite, insert, NULL, NULL, &errmsg);
                        if (ret != SQLITE_OK) {
                                if (errmsg) {
-                                       printf("lsqlite3_add: insert error: %s\n", errmsg);
+                                       ldb_set_errstring(module, talloc_strdup(module, errmsg));
                                        free(errmsg);
                                }
+                               ret = LDB_ERR_OTHER;
                                goto failed;
                        }
                }
        }
 
-       ret = sqlite3_exec(lsqlite3->sqlite, "COMMIT;", NULL, NULL, &errmsg);
-       if (ret != SQLITE_OK) {
-               if (errmsg) {
-                       printf("lsqlite3_add: commit error: %s\n", errmsg);
-                       free(errmsg);
-               }
-               goto failed;
-       }
-
        talloc_free(local_ctx);
-        return 0;
+        return LDB_SUCCESS;
 
 failed:
-       if (rollback) lsqlite3_safe_rollback(lsqlite3->sqlite); 
        talloc_free(local_ctx);
-       return -1;
+       return ret;
 }
 
 
@@ -1242,14 +1119,13 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
        struct lsqlite3_private *lsqlite3 = module->private_data;
         long long eid;
        char *errmsg;
-       int rollback = 0;
        int ret;
        int i;
         
        /* create a local ctx */
        local_ctx = talloc_named(lsqlite3, 0, "lsqlite3_modify local context");
        if (local_ctx == NULL) {
-               return -1;
+               return LDB_ERR_OTHER;
        }
 
         /* See if this is an ltdb special */
@@ -1259,31 +1135,17 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                c = ldb_dn_explode(local_ctx, "@SUBCLASSES");
                if (ldb_dn_compare(module->ldb, msg->dn, c) == 0) {
 #warning "modify subclasses into object class tree"
-                       goto failed;
-               }
-
-               c = ldb_dn_explode(local_ctx, "@INDEXLIST");
-               if (ldb_dn_compare(module->ldb, msg->dn, c) == 0) {
-#warning "should we handle indexes somehow ?"
+                       ret = LDB_ERR_UNWILLING_TO_PERFORM;
                        goto failed;
                }
 
                 /* Others are implicitly ignored */
-                return 0;
-       }
-
-       ret = sqlite3_exec(lsqlite3->sqlite, "BEGIN EXCLUSIVE;", NULL, NULL, &errmsg);
-       if (ret != SQLITE_OK) {
-               if (errmsg) {
-                       printf("lsqlite3_modify: error: %s\n", errmsg);
-                       free(errmsg);
-               }
-               goto failed;
+                return LDB_SUCCESS;
        }
-       rollback = 1;
 
        eid = lsqlite3_get_eid(module, msg->dn);
        if (eid == -1) {
+               ret = LDB_ERR_OTHER;
                goto failed;
        }
 
@@ -1296,8 +1158,9 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                int j;
 
                /* Get a case-folded copy of the attribute name */
-               attr = ldb_casefold(local_ctx, el->name);
+               attr = ldb_attr_casefold(module->ldb, local_ctx, el->name);
                if (attr == NULL) {
+                       ret = LDB_ERR_OTHER;
                        goto failed;
                }
 
@@ -1313,14 +1176,18 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                                                "WHERE eid = '%lld' "
                                                "AND norm_attr_name = '%q';",
                                                eid, attr);
-                       if (mod == NULL) goto failed;
+                       if (mod == NULL) {
+                               ret = LDB_ERR_OTHER;
+                               goto failed;
+                       }
 
                        ret = sqlite3_exec(lsqlite3->sqlite, mod, NULL, NULL, &errmsg);
                        if (ret != SQLITE_OK) {
                                if (errmsg) {
-                                       printf("lsqlite3_modify: error: %s\n", errmsg);
+                                       ldb_set_errstring(module, talloc_strdup(module, errmsg));
                                        free(errmsg);
                                }
+                               ret = LDB_ERR_OTHER;
                                goto failed;
                         }
 
@@ -1335,6 +1202,7 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                                /* Get a canonicalised copy of the data */
                                h->canonicalise_fn(module->ldb, local_ctx, &(el->values[j]), &value);
                                if (value.data == NULL) {
+                                       ret = LDB_ERR_OTHER;
                                        goto failed;
                                }
 
@@ -1346,14 +1214,18 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                                        eid, el->name, attr,
                                        el->values[j].data, value.data);
 
-                               if (mod == NULL) goto failed;
+                               if (mod == NULL) {
+                                       ret = LDB_ERR_OTHER;
+                                       goto failed;
+                               }
 
                                ret = sqlite3_exec(lsqlite3->sqlite, mod, NULL, NULL, &errmsg);
                                if (ret != SQLITE_OK) {
                                        if (errmsg) {
-                                               printf("lsqlite3_modify: error: %s\n", errmsg);
+                                               ldb_set_errstring(module, talloc_strdup(module, errmsg));
                                                free(errmsg);
                                        }
+                                       ret = LDB_ERR_OTHER;
                                        goto failed;
                                }
                        }
@@ -1368,14 +1240,18 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                                                        "WHERE eid = '%lld' "
                                                        "AND norm_attr_name = '%q';",
                                                        eid, attr);
-                               if (mod == NULL) goto failed;
+                               if (mod == NULL) {
+                                       ret = LDB_ERR_OTHER;
+                                       goto failed;
+                               }
 
                                ret = sqlite3_exec(lsqlite3->sqlite, mod, NULL, NULL, &errmsg);
                                if (ret != SQLITE_OK) {
                                        if (errmsg) {
-                                               printf("lsqlite3_modify: error: %s\n", errmsg);
+                                               ldb_set_errstring(module, talloc_strdup(module, errmsg));
                                                free(errmsg);
                                        }
+                                       ret = LDB_ERR_OTHER;
                                        goto failed;
                                }
                        }
@@ -1387,6 +1263,7 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                                /* Get a canonicalised copy of the data */
                                h->canonicalise_fn(module->ldb, local_ctx, &(el->values[j]), &value);
                                if (value.data == NULL) {
+                                       ret = LDB_ERR_OTHER;
                                        goto failed;
                                }
 
@@ -1397,14 +1274,18 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                                        "AND norm_attr_value = '%q';",
                                        eid, attr, value.data);
 
-                               if (mod == NULL) goto failed;
+                               if (mod == NULL) {
+                                       ret = LDB_ERR_OTHER;
+                                       goto failed;
+                               }
 
                                ret = sqlite3_exec(lsqlite3->sqlite, mod, NULL, NULL, &errmsg);
                                if (ret != SQLITE_OK) {
                                        if (errmsg) {
-                                               printf("lsqlite3_modify: error: %s\n", errmsg);
+                                               ldb_set_errstring(module, talloc_strdup(module, errmsg));
                                                free(errmsg);
                                        }
+                                       ret = LDB_ERR_OTHER;
                                        goto failed;
                                }
                        }
@@ -1413,22 +1294,12 @@ static int lsqlite3_modify(struct ldb_module *module, const struct ldb_message *
                }
        }
 
-       ret = sqlite3_exec(lsqlite3->sqlite, "COMMIT;", NULL, NULL, &errmsg);
-       if (ret != SQLITE_OK) {
-               if (errmsg) {
-                       printf("lsqlite3_modify: error: %s\n", errmsg);
-                       free(errmsg);
-               }
-               goto failed;
-       }
-
        talloc_free(local_ctx);
-        return 0;
+        return LDB_SUCCESS;
 
 failed:
-       if (rollback) lsqlite3_safe_rollback(lsqlite3->sqlite); 
        talloc_free(local_ctx);
-       return -1;
+       return ret;
 }
 
 /* delete a record */
@@ -1443,46 +1314,48 @@ static int lsqlite3_delete(struct ldb_module *module, const struct ldb_dn *dn)
 
        /* ignore ltdb specials */
        if (ldb_dn_is_special(dn)) {
-               return 0;
+               return LDB_SUCCESS;
        }
 
        /* create a local ctx */
        local_ctx = talloc_named(lsqlite3, 0, "lsqlite3_delete local context");
        if (local_ctx == NULL) {
-               return -1;
+               return LDB_ERR_OTHER;
        }
 
        eid = lsqlite3_get_eid(module, dn);
-       if (eid == -1) goto failed;
+       if (eid == -1) {
+               ret = LDB_ERR_OTHER;
+               goto failed;
+       }
 
        query = lsqlite3_tprintf(local_ctx,
-                                  /* Begin the transaction */
-                                  "BEGIN EXCLUSIVE; "
                                   /* Delete entry */
                                   "DELETE FROM ldb_entry WHERE eid = %lld; "
                                   /* Delete attributes */
-                                  "DELETE FROM ldb_attribute_values WHERE eid = %lld; "
-                                  /* Commit */
-                                  "COMMIT;",
+                                  "DELETE FROM ldb_attribute_values WHERE eid = %lld; ",
                                eid, eid);
-       if (query == NULL) goto failed;
+       if (query == NULL) {
+               ret = LDB_ERR_OTHER;
+               goto failed;
+       }
 
        ret = sqlite3_exec(lsqlite3->sqlite, query, NULL, NULL, &errmsg);
        if (ret != SQLITE_OK) {
                if (errmsg) {
-                       printf("lsqlite3_delete: error getting eid: %s\n", errmsg);
+                       ldb_set_errstring(module, talloc_strdup(module, errmsg));
                        free(errmsg);
                }
-               lsqlite3_safe_rollback(lsqlite3->sqlite);
+               ret = LDB_ERR_OTHER;
                goto failed;
        }
 
        talloc_free(local_ctx);
-        return 0;
+        return LDB_SUCCESS;
 
 failed:
        talloc_free(local_ctx);
-       return -1;
+       return ret;
 }
 
 /* rename a record */
@@ -1497,69 +1370,136 @@ static int lsqlite3_rename(struct ldb_module *module, const struct ldb_dn *olddn
 
        /* ignore ltdb specials */
        if (ldb_dn_is_special(olddn) || ldb_dn_is_special(newdn)) {
-               return 0;
+               return LDB_SUCCESS;
        }
 
        /* create a local ctx */
        local_ctx = talloc_named(lsqlite3, 0, "lsqlite3_rename local context");
        if (local_ctx == NULL) {
-               return -1;
+               return LDB_ERR_OTHER;
        }
 
        /* create linearized and normalized dns */
        old_cdn = ldb_dn_linearize(local_ctx, ldb_dn_casefold(module->ldb, olddn));
        new_cdn = ldb_dn_linearize(local_ctx, ldb_dn_casefold(module->ldb, newdn));
        new_dn = ldb_dn_linearize(local_ctx, newdn);
-       if (old_cdn == NULL || new_cdn == NULL || new_dn == NULL) goto failed;
+       if (old_cdn == NULL || new_cdn == NULL || new_dn == NULL) {
+               ret = LDB_ERR_OTHER;
+               goto failed;
+       }
 
        /* build the SQL query */
        query = lsqlite3_tprintf(local_ctx,
                                 "UPDATE ldb_entry SET dn = '%q', norm_dn = '%q' "
                                 "WHERE norm_dn = '%q';",
                                 new_dn, new_cdn, old_cdn);
-       if (query == NULL) goto failed;
+       if (query == NULL) {
+               ret = LDB_ERR_OTHER;
+               goto failed;
+       }
 
        /* execute */
        ret = sqlite3_exec(lsqlite3->sqlite, query, NULL, NULL, &errmsg);
        if (ret != SQLITE_OK) {
                if (errmsg) {
-                       printf("lsqlite3_rename: sqlite3_exec error: %s\n", errmsg);
+                       ldb_set_errstring(module, talloc_strdup(module, errmsg));
                        free(errmsg);
                }
+               ret = LDB_ERR_OTHER;
                goto failed;
        }
 
        /* clean up and exit */
        talloc_free(local_ctx);
-        return 0;
+        return LDB_SUCCESS;
 
 failed:
        talloc_free(local_ctx);
-       return -1;
+       return ret;
 }
-/* return extended error information */
-static const char *
-lsqlite3_errstring(struct ldb_module *module)
+
+static int lsqlite3_start_trans(struct ldb_module * module)
 {
+       int ret;
+       char *errmsg;
        struct lsqlite3_private *   lsqlite3 = module->private_data;
-        
-       return sqlite3_errmsg(lsqlite3->sqlite);
+
+       if (lsqlite3->trans_count == 0) {
+               ret = sqlite3_exec(lsqlite3->sqlite, "BEGIN IMMEDIATE;", NULL, NULL, &errmsg);
+               if (ret != SQLITE_OK) {
+                       if (errmsg) {
+                               printf("lsqlite3_start_trans: error: %s\n", errmsg);
+                               free(errmsg);
+                       }
+                       return -1;
+               }
+       };
+
+       lsqlite3->trans_count++;
+
+       return 0;
 }
 
+static int lsqlite3_end_trans(struct ldb_module *module)
+{
+       int ret;
+       char *errmsg;
+       struct lsqlite3_private *lsqlite3 = module->private_data;
+
+       if (lsqlite3->trans_count > 0) {
+               lsqlite3->trans_count--;
+       } else return -1;
+
+       if (lsqlite3->trans_count == 0) {
+               ret = sqlite3_exec(lsqlite3->sqlite, "COMMIT;", NULL, NULL, &errmsg);
+               if (ret != SQLITE_OK) {
+                       if (errmsg) {
+                               printf("lsqlite3_end_trans: error: %s\n", errmsg);
+                               free(errmsg);
+                       }
+                       return -1;
+               }
+       }
+
+        return 0;
+}
 
+static int lsqlite3_del_trans(struct ldb_module *module)
+{
+       struct lsqlite3_private *lsqlite3 = module->private_data;
 
+       if (lsqlite3->trans_count > 0) {
+               lsqlite3->trans_count--;
+       } else return -1;
+
+       if (lsqlite3->trans_count == 0) {
+               return lsqlite3_safe_rollback(lsqlite3->sqlite);
+       }
+
+       return -1;
+}
 
 /*
  * Static functions
  */
 
-static int initialize(struct lsqlite3_private *lsqlite3, const char *url)
+static int initialize(struct lsqlite3_private *lsqlite3,
+                     struct ldb_context *ldb, const char *url, int flags)
 {
-        int ret;
+       TALLOC_CTX *local_ctx;
         long long queryInt;
-        const char *pTail;
-        sqlite3_stmt *stmt;
-        const char *schema =       
+       int rollback = 0;
+       char *errmsg;
+        char *schema;
+        int ret;
+
+       /* create a local ctx */
+       local_ctx = talloc_named(lsqlite3, 0, "lsqlite3_rename local context");
+       if (local_ctx == NULL) {
+               return -1;
+       }
+
+       schema = lsqlite3_tprintf(local_ctx,
                 
                 
                 "CREATE TABLE ldb_info AS "
@@ -1616,7 +1556,7 @@ static int initialize(struct lsqlite3_private *lsqlite3, const char *url)
                 /*
                  * Triggers
                  */
-                
                 "CREATE TRIGGER ldb_object_classes_insert_tr"
                 "  AFTER INSERT"
                 "  ON ldb_object_classes"
@@ -1637,7 +1577,7 @@ static int initialize(struct lsqlite3_private *lsqlite3, const char *url)
                 "        SET max_child_num = max_child_num + 1"
                 "        WHERE class_name = new.parent_class_name;"
                 "    END;"
-                
+
                 /*
                  * Table initialization
                  */
@@ -1645,9 +1585,7 @@ static int initialize(struct lsqlite3_private *lsqlite3, const char *url)
                 "INSERT INTO ldb_object_classes "
                 "    (class_name, tree_key) "
                 "  VALUES "
-                "    ('TOP', '0001');"
-
-                ;
+                "    ('TOP', '0001');");
         
         /* Skip protocol indicator of url  */
         if (strncmp(url, "sqlite://", 9) != 0) {
@@ -1663,9 +1601,28 @@ static int initialize(struct lsqlite3_private *lsqlite3, const char *url)
         }
         
         /* In case this is a new database, enable auto_vacuum */
-        if (query_norows(lsqlite3, "PRAGMA auto_vacuum=1;") != 0) {
-                        return -1;
-        }
+       ret = sqlite3_exec(lsqlite3->sqlite, "PRAGMA auto_vacuum = 1;", NULL, NULL, &errmsg);
+       if (ret != SQLITE_OK) {
+               if (errmsg) {
+                       printf("lsqlite3 initializaion error: %s\n", errmsg);
+                       free(errmsg);
+               }
+               goto failed;
+       }
+        
+       if (flags & LDB_FLG_NOSYNC) {
+               /* DANGEROUS */
+               ret = sqlite3_exec(lsqlite3->sqlite, "PRAGMA synchronous = OFF;", NULL, NULL, &errmsg);
+               if (ret != SQLITE_OK) {
+                       if (errmsg) {
+                               printf("lsqlite3 initializaion error: %s\n", errmsg);
+                               free(errmsg);
+                       }
+                       goto failed;
+               }
+       }
+        
+       /* */
         
         /* Establish a busy timeout of 30 seconds */
         if ((ret = sqlite3_busy_timeout(lsqlite3->sqlite,
@@ -1701,43 +1658,52 @@ static int initialize(struct lsqlite3_private *lsqlite3, const char *url)
                 return ret;
         }
 
-        /* Begin a transaction */
-        if ((ret = query_norows(lsqlite3, "BEGIN EXCLUSIVE;")) != 0) {
-                        return ret;
+        /* Create a function, callable from sql, to perform various comparisons */
+        if ((ret =
+             sqlite3_create_function(lsqlite3->sqlite, /* handle */
+                                     "ldap_compare",   /* function name */
+                                     4,                /* number of args */
+                                     SQLITE_ANY,       /* preferred text type */
+                                     ldb  ,            /* user data */
+                                     lsqlite3_compare, /* called func */
+                                     NULL,             /* step func */
+                                     NULL              /* final func */
+                     )) != SQLITE_OK) {
+                return ret;
         }
-        
+
+        /* Begin a transaction */
+       ret = sqlite3_exec(lsqlite3->sqlite, "BEGIN EXCLUSIVE;", NULL, NULL, &errmsg);
+       if (ret != SQLITE_OK) {
+               if (errmsg) {
+                       printf("lsqlite3: initialization error: %s\n", errmsg);
+                       free(errmsg);
+               }
+               goto failed;
+       }
+       rollback = 1;
         /* Determine if this is a new database.  No tables means it is. */
         if (query_int(lsqlite3,
                       &queryInt,
                       "SELECT COUNT(*)\n"
                       "  FROM sqlite_master\n"
                       "  WHERE type = 'table';") != 0) {
-                query_norows(lsqlite3, "ROLLBACK;");
-                return -1;
+               goto failed;
         }
         
         if (queryInt == 0) {
                 /*
                  * Create the database schema
                  */
-                for (pTail = discard_const_p(char, schema);
-                     pTail != NULL && *pTail != '\0';
-                        ) {
-                        
-                        if ((ret = sqlite3_prepare(
-                                     lsqlite3->sqlite,
-                                     pTail,
-                                     -1,
-                                     &stmt,
-                                     &pTail)) != SQLITE_OK ||
-                            (ret = sqlite3_step(stmt)) != SQLITE_DONE ||
-                            (ret = sqlite3_finalize(stmt)) != SQLITE_OK) {
-                                
-                                query_norows(lsqlite3, "ROLLBACK;");
-                                (void) sqlite3_close(lsqlite3->sqlite);
-                                return ret;
-                        }
-                }
+               ret = sqlite3_exec(lsqlite3->sqlite, schema, NULL, NULL, &errmsg);
+               if (ret != SQLITE_OK) {
+                       if (errmsg) {
+                               printf("lsqlite3 initializaion error: %s\n", errmsg);
+                               free(errmsg);
+                       }
+                       goto failed;
+               }
         } else {
                 /*
                  * Ensure that the database we opened is one of ours
@@ -1763,34 +1729,26 @@ static int initialize(struct lsqlite3_private *lsqlite3, const char *url)
                     queryInt != 1) {
                         
                         /* It's not one that we created.  See ya! */
-                        query_norows(lsqlite3, "ROLLBACK;");
-                        (void) sqlite3_close(lsqlite3->sqlite);
-                        return SQLITE_MISUSE;
+                       goto failed;
                 }
         }
         
-        /*
-         * Create a temporary table to hold attributes requested in the result
-         * set of a search.
-         */
-        query_norows(lsqlite3, "DROP TABLE " RESULT_ATTR_TABLE ";\n");
-        if ((ret =
-             query_norows(lsqlite3,
-                          "CREATE " TEMPTAB " TABLE " RESULT_ATTR_TABLE "\n"
-                          " (\n"
-                          "  attr_name TEXT PRIMARY KEY\n"
-                          " );")) != 0) {
-                query_norows(lsqlite3, "ROLLBACK;");
-                return ret;
-        }
-
         /* Commit the transaction */
-        if ((ret = query_norows(lsqlite3, "COMMIT;")) != 0) {
-                query_norows(lsqlite3, "ROLLBACK;");
-                return ret;
-        }
-        
+       ret = sqlite3_exec(lsqlite3->sqlite, "COMMIT;", NULL, NULL, &errmsg);
+       if (ret != SQLITE_OK) {
+               if (errmsg) {
+                       printf("lsqlite3: iniialization error: %s\n", errmsg);
+                       free(errmsg);
+               }
+               goto failed;
+       }
         return SQLITE_OK;
+
+failed:
+       if (rollback) lsqlite3_safe_rollback(lsqlite3->sqlite); 
+       sqlite3_close(lsqlite3->sqlite);
+       return -1;
 }
 
 static int
@@ -1805,21 +1763,58 @@ destructor(void *p)
 }
 
 
+static int lsqlite3_request(struct ldb_module *module, struct ldb_request *req)
+{
+       /* check for oustanding critical controls and return an error if found */
+       if (check_critical_controls(req->controls)) {
+               return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
+       }
+       
+       switch (req->operation) {
+
+       case LDB_REQ_SEARCH:
+               return lsqlite3_search_bytree(module,
+                                         req->op.search.base,
+                                         req->op.search.scope, 
+                                         req->op.search.tree, 
+                                         req->op.search.attrs, 
+                                         &req->op.search.res);
+
+       case LDB_REQ_ADD:
+               return lsqlite3_add(module, req->op.add.message);
+
+       case LDB_REQ_MODIFY:
+               return lsqlite3_modify(module, req->op.mod.message);
+
+       case LDB_REQ_DELETE:
+               return lsqlite3_delete(module, req->op.del.dn);
+
+       case LDB_REQ_RENAME:
+               return lsqlite3_rename(module,
+                                       req->op.rename.olddn,
+                                       req->op.rename.newdn);
+
+       default:
+               return LDB_ERR_OPERATIONS_ERROR;
+
+       }
+}
+
+static int lsqlite3_init_2(struct ldb_module *module)
+{
+       return LDB_SUCCESS;
+}
 
 /*
  * Table of operations for the sqlite3 backend
  */
 static const struct ldb_module_ops lsqlite3_ops = {
-       .name          = "sqlite",
-       .search        = lsqlite3_search,
-       .search_bytree = lsqlite3_search_bytree,
-       .add_record    = lsqlite3_add,
-       .modify_record = lsqlite3_modify,
-       .delete_record = lsqlite3_delete,
-       .rename_record = lsqlite3_rename,
-       .named_lock    = lsqlite3_lock,
-       .named_unlock  = lsqlite3_unlock,
-       .errstring     = lsqlite3_errstring
+       .name              = "sqlite",
+       .request           = lsqlite3_request,
+       .start_transaction = lsqlite3_start_trans,
+       .end_transaction   = lsqlite3_end_trans,
+       .del_transaction   = lsqlite3_del_trans,
+       .second_stage_init = lsqlite3_init_2
 };
 
 /*
@@ -1841,9 +1836,9 @@ int lsqlite3_connect(struct ldb_context *ldb,
         
        lsqlite3->sqlite = NULL;
        lsqlite3->options = NULL;
-        lsqlite3->lock_count = 0;
+       lsqlite3->trans_count = 0;
         
-       ret = initialize(lsqlite3, url);
+       ret = initialize(lsqlite3, ldb, url, flags);
        if (ret != SQLITE_OK) {
                goto failed;
        }