Testing max, min and negative values for indexed 32 and 64 bit types.
This has to be done in two different files because the 64 bit type is
LDB_SYNTAX_INTEGER which is implemented at the ldb level, while the 32
bit is added in the ldb-samba module. Schema syntax binding added for
ldb-samba.
We also need to make sure that full scans are not invoked for LMDB.
Pair-programmed-with: Garming Sam <garming@catalyst.net.nz>
Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
#include "lib/ldb-samba/ldif_handlers.h"
#include "auth/pyauth.h"
#include "source4/dsdb/common/util.h"
+#include "lib/ldb/include/ldb_private.h"
static PyObject *pyldb_module;
Py_RETURN_NONE;
}
+static PyObject *py_ldb_samba_schema_attribute_add(PyLdbObject *self,
+ PyObject *args)
+{
+ char *attribute, *syntax;
+ const struct ldb_schema_syntax *s;
+ unsigned int flags;
+ int ret;
+ struct ldb_context *ldb_ctx;
+
+ if (!PyArg_ParseTuple(args, "sIs", &attribute, &flags, &syntax))
+ return NULL;
+
+ ldb_ctx = pyldb_Ldb_AsLdbContext(self);
+
+ s = ldb_samba_syntax_by_name(ldb_ctx, syntax);
+ ret = ldb_schema_attribute_add_with_syntax(ldb_ctx, attribute,
+ flags, s);
+
+ PyErr_LDB_ERROR_IS_ERR_RAISE(py_ldb_error, ret, ldb_ctx);
+
+ Py_RETURN_NONE;
+}
+
static PyObject *py_ldb_register_samba_handlers(PyObject *self)
{
struct ldb_context *ldb;
{ "set_session_info", (PyCFunction)py_ldb_set_session_info, METH_VARARGS,
"set_session_info(session_info)\n"
"Set session info to use when connecting." },
+ { "samba_schema_attribute_add",
+ (PyCFunction)py_ldb_samba_schema_attribute_add,
+ METH_VARARGS, NULL },
{ NULL },
};
.tp_flags = Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE,
};
+
MODULE_INIT_FUNC(_ldb)
{
PyObject *m;
Py_INCREF(&PySambaLdb);
PyModule_AddObject(m, "Ldb", (PyObject *)&PySambaLdb);
+#define ADD_LDB_STRING(val) PyModule_AddStringConstant(m, #val, LDB_## val)
+ ADD_LDB_STRING(SYNTAX_SAMBA_INT32);
+
return m;
}
--- /dev/null
+#!/usr/bin/env python3
+#
+# Tests for comparison expressions on indexed keys
+#
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+"""Tests for expressions containing comparisons on indexed attributes.
+ Copied from ldb's index.py"""
+
+import os
+from unittest import TestCase
+import sys
+from samba import _ldb
+import shutil
+from ldb import SCOPE_SUBTREE
+from samba.tests.subunitrun import TestProgram
+
+PY3 = sys.version_info > (3, 0)
+
+TDB_PREFIX = "tdb://"
+MDB_PREFIX = "mdb://"
+
+def tempdir():
+ import tempfile
+ try:
+ dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
+ except KeyError:
+ dir_prefix = None
+ return tempfile.mkdtemp(dir=dir_prefix)
+
+class LdbBaseTest(TestCase):
+ def setUp(self):
+ super(LdbBaseTest, self).setUp()
+ try:
+ if self.prefix is None:
+ self.prefix = TDB_PREFIX
+ except AttributeError:
+ self.prefix = TDB_PREFIX
+
+ def tearDown(self):
+ super(LdbBaseTest, self).tearDown()
+
+ def url(self):
+ return self.prefix + self.filename
+
+ def flags(self):
+ if self.prefix == MDB_PREFIX:
+ return ldb.FLG_NOSYNC
+ else:
+ return 0
+
+ def options(self):
+ if self.prefix == MDB_PREFIX:
+ return ['disable_full_db_scan_for_self_test:1']
+ else:
+ return None
+
+class LdbTDBIndexedComparisonExpressions(LdbBaseTest):
+ def tearDown(self):
+ shutil.rmtree(self.testdir)
+ super(LdbTDBIndexedComparisonExpressions, self).tearDown()
+
+ # Ensure the LDB is closed now, so we close the FD
+ del(self.l)
+
+ def setUp(self):
+ super(LdbTDBIndexedComparisonExpressions, self).setUp()
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "indexedcomptest.ldb")
+ # Note that the maximum key length is set to 54
+ # This accounts for the 4 bytes added by the dn formatting
+ # a leading dn=, and a trailing zero terminator
+ #
+ self.l = _ldb.Ldb(self.url(), options=self.options())
+ self.l.add({"dn": "@ATTRIBUTES"})
+ self.l.add({"dn": "@INDEXLIST",
+ "@IDXATTR": [b"int32attr"],
+ "@IDXONE": [b"1"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]})
+
+ def test_comparison_expression(self):
+ self.l.samba_schema_attribute_add("int32attr", 0,
+ _ldb.SYNTAX_SAMBA_INT32)
+
+ int32_max = 2**31-1
+ int32_min = -2**31
+ test_nums = list(range(-5, 5))
+ test_nums += list(range(int32_max-5, int32_max+1))
+ test_nums += list(range(int32_min, int32_min+5))
+ test_nums = sorted(test_nums)
+
+ for i in test_nums:
+ ouuid = 0x0123456789abcdef + i
+ ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode())
+ self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i),
+ "objectUUID": ouuid_s,
+ "int32attr": str(i)})
+
+ def assert_int32_expr(expr, py_expr=None):
+ res = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=SCOPE_SUBTREE,
+ expression="(int32attr%s)" % (expr))
+
+ if not py_expr:
+ py_expr = expr
+ expect = [n for n in test_nums if eval(str(n) + py_expr)]
+ vals = sorted([int(r.get("int32attr")[0]) for r in res])
+ self.assertEqual(len(res), len(expect))
+ self.assertEqual(set(vals), set(expect))
+ self.assertEqual(expect, vals)
+
+ assert_int32_expr(">=-2")
+ assert_int32_expr("<=2")
+ assert_int32_expr(">=" + str(int32_min))
+ assert_int32_expr("<=" + str(int32_min))
+ assert_int32_expr("<=" + str(int32_min+1))
+ assert_int32_expr("<=" + str(int32_max))
+ assert_int32_expr(">=" + str(int32_max))
+ assert_int32_expr(">=" + str(int32_max-1))
+ assert_int32_expr("=10", "==10")
+
+# Run the same tests against an lmdb backend
+class LdbLMDBIndexedComparisonExpressions(LdbTDBIndexedComparisonExpressions):
+
+ def setUp(self):
+ if os.environ.get('HAVE_LMDB', '1') == '0':
+ self.skipTest("No lmdb backend")
+ self.prefix = MDB_PREFIX
+ super(LdbLMDBIndexedComparisonExpressions, self).setUp()
+
+ def tearDown(self):
+ super(LdbLMDBIndexedComparisonExpressions, self).tearDown()
+
+
+TestProgram(module=__name__, opts=[])
super(MaxIndexKeyLengthTestsLmdb, self).tearDown()
+class OrderedIntegerRangeTests(LdbBaseTest):
+
+ def tearDown(self):
+ shutil.rmtree(self.testdir)
+ super(OrderedIntegerRangeTests, self).tearDown()
+
+ # Ensure the LDB is closed now, so we close the FD
+ del(self.l)
+
+ def setUp(self):
+ super(OrderedIntegerRangeTests, self).setUp()
+ self.testdir = tempdir()
+ self.filename = os.path.join(self.testdir, "ordered_integer_test.ldb")
+
+ self.l = ldb.Ldb(self.url(),
+ options=self.options())
+ self.l.add({"dn": "@ATTRIBUTES",
+ "int64attr": "ORDERED_INTEGER"})
+ self.l.add({"dn": "@INDEXLIST",
+ "@IDXATTR": [b"int64attr"],
+ "@IDXONE": [b"1"],
+ "@IDXGUID": [b"objectUUID"],
+ "@IDX_DN_GUID": [b"GUID"]})
+
+ def options(self):
+ if self.prefix == MDB_PREFIX:
+ return ['modules:rdn_name',
+ 'disable_full_db_scan_for_self_test:1']
+ else:
+ return ['modules:rdn_name']
+
+ def test_comparison_expression(self):
+ int64_max = 2**63-1
+ int64_min = -2**63
+ test_nums = list(range(-5, 5))
+ test_nums += list(range(int64_max-5, int64_max+1))
+ test_nums += list(range(int64_min, int64_min+5))
+ test_nums = sorted(test_nums)
+
+ for (i, num) in enumerate(test_nums):
+ ouuid = 0x0123456789abcdef + i
+ ouuid_s = bytes(('0' + hex(ouuid)[2:]).encode())
+ self.l.add({"dn": "OU=COMPTESTOU{},DC=SAMBA,DC=ORG".format(i),
+ "objectUUID": ouuid_s,
+ "int64attr": str(num)})
+
+ def assert_int64_expr(expr, py_expr=None):
+ res = self.l.search(base="DC=SAMBA,DC=ORG",
+ scope=ldb.SCOPE_SUBTREE,
+ expression="(int64attr%s)" % (expr))
+
+ if not py_expr:
+ py_expr = expr
+ expect = [n for n in test_nums if eval(str(n) + py_expr)]
+ vals = sorted([int(r.get("int64attr")[0]) for r in res])
+ self.assertEqual(len(res), len(expect))
+ self.assertEqual(set(vals), set(expect))
+ self.assertEqual(expect, vals)
+
+ assert_int64_expr(">=-2")
+ assert_int64_expr("<=2")
+ assert_int64_expr(">=" + str(int64_min))
+ assert_int64_expr("<=" + str(int64_min))
+ assert_int64_expr("<=" + str(int64_min+1))
+ assert_int64_expr("<=" + str(int64_max))
+ assert_int64_expr(">=" + str(int64_max))
+ assert_int64_expr(">=" + str(int64_max-1))
+ assert_int64_expr("=10", "==10")
+
+
+# Run the ordered integer range tests against an lmdb backend
+class OrderedIntegerRangeTestsLmdb(OrderedIntegerRangeTests):
+
+ def setUp(self):
+ if os.environ.get('HAVE_LMDB', '1') == '0':
+ self.skipTest("No lmdb backend")
+ self.prefix = MDB_PREFIX
+ super(OrderedIntegerRangeTestsLmdb, self).setUp()
+
+ def tearDown(self):
+ super(OrderedIntegerRangeTestsLmdb, self).tearDown()
+
+
# Run the index truncation tests against an lmdb backend
class RejectSubDBIndex(LdbBaseTest):
py_expr = "%d <= {%s} <= %d" % (n-1, field, n+1)
self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects)
+ half_n = int(n/2)
+
+ expr = "(%s<=%s)" % (field, half_n)
+ py_expr = "{%s} <= %d" % (field, half_n)
+ self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects)
+
+ expr = "(%s>=%s)" % (field, half_n)
+ py_expr = "{%s} >= %d" % (field, half_n)
+ self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects)
+
# Same test again for largeint and enum
def test_largeint_range(self):
self.test_int_range(self.largeint_f)
py_expr = "{%s} <= %d" % (field, search_num)
self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects)
+ expr = "(&(%s>=%d)(objectClass=user))" % (field, search_num)
+ py_expr = "{%s} >= %d" % (field, search_num)
+ self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects)
+
+
# If we're called independently then import subunit, get host from first
# arg and run. Otherwise, subunit ran us so just set host from env.
# We always try to run over LDAP rather than direct file, so that
plantestsuite_loadlist("samba4.urgent_replication.python(ad_dc_ntvfs)", "ad_dc_ntvfs:local", [python, os.path.join(samba4srcdir, "dsdb/tests/python/urgent_replication.py"), '$PREFIX_ABS/ad_dc_ntvfs/private/sam.ldb', '$LOADLIST', '$LISTOPT'])
plantestsuite_loadlist("samba4.ldap.dirsync.python(ad_dc_default)", "ad_dc_default", [python, os.path.join(samba4srcdir, "dsdb/tests/python/dirsync.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
plantestsuite_loadlist("samba4.ldap.match_rules.python", "ad_dc_ntvfs", [python, os.path.join(srcdir(), "lib/ldb-samba/tests/match_rules.py"), '$PREFIX_ABS/ad_dc_ntvfs/private/sam.ldb', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
+plantestsuite("samba4.ldap.index.python", "none", [python, os.path.join(srcdir(), "lib/ldb-samba/tests/index.py")])
plantestsuite_loadlist("samba4.ldap.notification.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/notification.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
plantestsuite_loadlist("samba4.ldap.sites.python(ad_dc_default)", "ad_dc_default", [python, os.path.join(samba4srcdir, "dsdb/tests/python/sites.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])