2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
6 from unittest import TestCase
8 sys.path.insert(0, "bin/python")
22 "@IDXGUID": [b"objectUUID"],
23 "@IDX_DN_GUID": [b"GUID"]
30 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
33 return tempfile.mkdtemp(dir=dir_prefix)
36 class NoContextTests(TestCase):
38 def test_valid_attr_name(self):
39 self.assertTrue(ldb.valid_attr_name("foo"))
40 self.assertFalse(ldb.valid_attr_name("24foo"))
42 def test_timestring(self):
43 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
44 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
46 self.assertEqual("00000101000000.0Z", ldb.timestring(-62167219200))
47 self.assertEqual("99991231235959.0Z", ldb.timestring(253402300799))
49 # should result with OSError EOVERFLOW from gmtime()
50 with self.assertRaises(OSError) as err:
51 ldb.timestring(-62167219201)
52 self.assertEqual(err.exception.errno, errno.EOVERFLOW)
53 with self.assertRaises(OSError) as err:
54 ldb.timestring(253402300800)
55 self.assertEqual(err.exception.errno, errno.EOVERFLOW)
56 with self.assertRaises(OSError) as err:
57 ldb.timestring(0x7fffffffffffffff)
58 self.assertEqual(err.exception.errno, errno.EOVERFLOW)
60 def test_string_to_time(self):
61 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
62 self.assertEqual(-1, ldb.string_to_time("19691231235959.0Z"))
63 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
65 self.assertEqual(-62167219200, ldb.string_to_time("00000101000000.0Z"))
66 self.assertEqual(253402300799, ldb.string_to_time("99991231235959.0Z"))
68 def test_binary_encode(self):
69 encoded = ldb.binary_encode(b'test\\x')
70 decoded = ldb.binary_decode(encoded)
71 self.assertEqual(decoded, b'test\\x')
73 encoded2 = ldb.binary_encode('test\\x')
74 self.assertEqual(encoded2, encoded)
77 class LdbBaseTest(TestCase):
79 super(LdbBaseTest, self).setUp()
81 if self.prefix is None:
82 self.prefix = TDB_PREFIX
83 except AttributeError:
84 self.prefix = TDB_PREFIX
87 super(LdbBaseTest, self).tearDown()
90 return self.prefix + self.filename
93 if self.prefix == MDB_PREFIX:
99 class SimpleLdb(LdbBaseTest):
102 super(SimpleLdb, self).setUp()
103 self.testdir = tempdir()
104 self.filename = os.path.join(self.testdir, "test.ldb")
105 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
107 self.ldb.add(self.index)
108 except AttributeError:
112 shutil.rmtree(self.testdir)
113 super(SimpleLdb, self).tearDown()
114 # Ensure the LDB is closed now, so we close the FD
117 def test_connect(self):
118 ldb.Ldb(self.url(), flags=self.flags())
120 def test_connect_none(self):
123 def test_connect_later(self):
125 x.connect(self.url(), flags=self.flags())
127 def test_connect_twice(self):
130 with self.assertRaises(ldb.LdbError):
131 x.connect(url, flags=self.flags())
133 def test_connect_twice_later(self):
137 x.connect(url, flags)
138 with self.assertRaises(ldb.LdbError):
139 x.connect(url, flags)
141 def test_connect_and_disconnect(self):
145 x.connect(url, flags)
147 x.connect(url, flags)
152 self.assertTrue(repr(x).startswith("<ldb connection"))
154 def test_set_create_perms(self):
156 x.set_create_perms(0o600)
158 def test_search(self):
159 l = ldb.Ldb(self.url(), flags=self.flags())
160 self.assertEqual(len(l.search()), 0)
162 def test_search_controls(self):
163 l = ldb.Ldb(self.url(), flags=self.flags())
164 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
166 def test_utf8_ldb_Dn(self):
167 l = ldb.Ldb(self.url(), flags=self.flags())
168 dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
170 def test_utf8_encoded_ldb_Dn(self):
171 l = ldb.Ldb(self.url(), flags=self.flags())
172 dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
174 dn = ldb.Dn(l, dn_encoded_utf8)
175 except UnicodeDecodeError as e:
177 except TypeError as te:
178 p3errors = ["argument 2 must be str, not bytes",
179 "Can't convert 'bytes' object to str implicitly"]
180 self.assertIn(str(te), p3errors)
182 def test_search_attrs(self):
183 l = ldb.Ldb(self.url(), flags=self.flags())
184 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
186 def test_search_string_dn(self):
187 l = ldb.Ldb(self.url(), flags=self.flags())
188 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
190 def test_search_attr_string(self):
191 l = ldb.Ldb(self.url(), flags=self.flags())
192 self.assertRaises(TypeError, l.search, attrs="dc")
193 self.assertRaises(TypeError, l.search, attrs=b"dc")
195 def test_opaque(self):
196 l = ldb.Ldb(self.url(), flags=self.flags())
197 l.set_opaque("my_opaque", True)
198 self.assertTrue(l.get_opaque("my_opaque") is not None)
199 self.assertEqual(None, l.get_opaque("unknown"))
201 def test_opaque_bool(self):
202 """Test that we can set boolean opaque values."""
204 db = ldb.Ldb(self.url(), flags=self.flags())
207 db.set_opaque(name, False)
208 self.assertEqual(False, db.get_opaque(name))
210 db.set_opaque(name, True)
211 self.assertEqual(True, db.get_opaque(name))
213 def test_opaque_int(self):
214 """Test that we can set (positive) integer opaque values."""
216 db = ldb.Ldb(self.url(), flags=self.flags())
219 db.set_opaque(name, 0)
220 self.assertEqual(0, db.get_opaque(name))
222 db.set_opaque(name, 12345678)
223 self.assertEqual(12345678, db.get_opaque(name))
225 # Negative values can’t be set.
226 self.assertRaises(OverflowError, db.set_opaque, name, -99999)
228 def test_opaque_string(self):
229 """Test that we can set string opaque values."""
231 db = ldb.Ldb(self.url(), flags=self.flags())
234 db.set_opaque(name, "")
235 self.assertEqual("", db.get_opaque(name))
237 db.set_opaque(name, "foo bar")
238 self.assertEqual("foo bar", db.get_opaque(name))
240 def test_opaque_none(self):
241 """Test that we can set an opaque to None to effectively unset it."""
243 db = ldb.Ldb(self.url(), flags=self.flags())
246 # An opaque that has not been set is the same as None.
247 self.assertIsNone(db.get_opaque(name))
249 # Give the opaque a value.
250 db.set_opaque(name, 3)
251 self.assertEqual(3, db.get_opaque(name))
253 # Test that we can set the opaque to None to unset it.
254 db.set_opaque(name, None)
255 self.assertIsNone(db.get_opaque(name))
257 def test_opaque_unsupported(self):
258 """Test that trying to set unsupported values raises an error."""
260 db = ldb.Ldb(self.url(), flags=self.flags())
263 self.assertRaises(ValueError, db.set_opaque, name, [])
264 self.assertRaises(ValueError, db.set_opaque, name, ())
265 self.assertRaises(ValueError, db.set_opaque, name, 3.14)
266 self.assertRaises(ValueError, db.set_opaque, name, 3+2j)
267 self.assertRaises(ValueError, db.set_opaque, name, b'foo')
269 def test_search_scope_base_empty_db(self):
270 l = ldb.Ldb(self.url(), flags=self.flags())
271 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
274 def test_search_scope_onelevel_empty_db(self):
275 l = ldb.Ldb(self.url(), flags=self.flags())
276 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
277 ldb.SCOPE_ONELEVEL)), 0)
279 def test_delete(self):
280 l = ldb.Ldb(self.url(), flags=self.flags())
281 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
283 def test_delete_w_unhandled_ctrl(self):
284 l = ldb.Ldb(self.url(), flags=self.flags())
286 m.dn = ldb.Dn(l, "dc=foo1")
288 m["objectUUID"] = b"0123456789abcdef"
290 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
293 def test_contains(self):
295 l = ldb.Ldb(name, flags=self.flags())
296 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
297 l = ldb.Ldb(name, flags=self.flags())
299 m.dn = ldb.Dn(l, "dc=foo3")
301 m["objectUUID"] = b"0123456789abcdef"
304 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
305 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
309 def test_get_config_basedn(self):
310 l = ldb.Ldb(self.url(), flags=self.flags())
311 self.assertEqual(None, l.get_config_basedn())
313 def test_get_root_basedn(self):
314 l = ldb.Ldb(self.url(), flags=self.flags())
315 self.assertEqual(None, l.get_root_basedn())
317 def test_get_schema_basedn(self):
318 l = ldb.Ldb(self.url(), flags=self.flags())
319 self.assertEqual(None, l.get_schema_basedn())
321 def test_get_default_basedn(self):
322 l = ldb.Ldb(self.url(), flags=self.flags())
323 self.assertEqual(None, l.get_default_basedn())
326 l = ldb.Ldb(self.url(), flags=self.flags())
328 m.dn = ldb.Dn(l, "dc=foo4")
330 m["objectUUID"] = b"0123456789abcdef"
331 self.assertEqual(len(l.search()), 0)
334 self.assertEqual(len(l.search()), 1)
336 l.delete(ldb.Dn(l, "dc=foo4"))
338 def test_search_iterator(self):
339 l = ldb.Ldb(self.url(), flags=self.flags())
340 s = l.search_iterator()
346 except RuntimeError as re:
351 except RuntimeError as re:
356 except RuntimeError as re:
359 s = l.search_iterator()
362 self.assertTrue(isinstance(me, ldb.Message))
365 self.assertEqual(len(r), 0)
366 self.assertEqual(count, 0)
369 m1.dn = ldb.Dn(l, "dc=foo4")
371 m1["objectUUID"] = b"0123456789abcdef"
374 s = l.search_iterator()
377 self.assertTrue(isinstance(me, ldb.Message))
381 self.assertEqual(len(r), 0)
382 self.assertEqual(len(msgs), 1)
383 self.assertEqual(msgs[0].dn, m1.dn)
386 m2.dn = ldb.Dn(l, "dc=foo5")
388 m2["objectUUID"] = b"0123456789abcdee"
391 s = l.search_iterator()
394 self.assertTrue(isinstance(me, ldb.Message))
398 self.assertEqual(len(r), 0)
399 self.assertEqual(len(msgs), 2)
400 if msgs[0].dn == m1.dn:
401 self.assertEqual(msgs[0].dn, m1.dn)
402 self.assertEqual(msgs[1].dn, m2.dn)
404 self.assertEqual(msgs[0].dn, m2.dn)
405 self.assertEqual(msgs[1].dn, m1.dn)
407 s = l.search_iterator()
410 self.assertTrue(isinstance(me, ldb.Message))
417 except RuntimeError as re:
420 self.assertTrue(isinstance(me, ldb.Message))
428 self.assertEqual(len(r), 0)
429 self.assertEqual(len(msgs), 2)
430 if msgs[0].dn == m1.dn:
431 self.assertEqual(msgs[0].dn, m1.dn)
432 self.assertEqual(msgs[1].dn, m2.dn)
434 self.assertEqual(msgs[0].dn, m2.dn)
435 self.assertEqual(msgs[1].dn, m1.dn)
437 l.delete(ldb.Dn(l, "dc=foo4"))
438 l.delete(ldb.Dn(l, "dc=foo5"))
440 def test_add_text(self):
441 l = ldb.Ldb(self.url(), flags=self.flags())
443 m.dn = ldb.Dn(l, "dc=foo4")
445 m["objectUUID"] = b"0123456789abcdef"
446 self.assertEqual(len(l.search()), 0)
449 self.assertEqual(len(l.search()), 1)
451 l.delete(ldb.Dn(l, "dc=foo4"))
453 def test_add_w_unhandled_ctrl(self):
454 l = ldb.Ldb(self.url(), flags=self.flags())
456 m.dn = ldb.Dn(l, "dc=foo4")
458 self.assertEqual(len(l.search()), 0)
459 self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
461 def test_add_dict(self):
462 l = ldb.Ldb(self.url(), flags=self.flags())
463 m = {"dn": ldb.Dn(l, "dc=foo5"),
465 "objectUUID": b"0123456789abcdef"}
466 self.assertEqual(len(l.search()), 0)
469 self.assertEqual(len(l.search()), 1)
471 l.delete(ldb.Dn(l, "dc=foo5"))
473 def test_add_dict_text(self):
474 l = ldb.Ldb(self.url(), flags=self.flags())
475 m = {"dn": ldb.Dn(l, "dc=foo5"),
477 "objectUUID": b"0123456789abcdef"}
478 self.assertEqual(len(l.search()), 0)
481 self.assertEqual(len(l.search()), 1)
483 l.delete(ldb.Dn(l, "dc=foo5"))
485 def test_add_dict_string_dn(self):
486 l = ldb.Ldb(self.url(), flags=self.flags())
487 m = {"dn": "dc=foo6", "bla": b"bla",
488 "objectUUID": b"0123456789abcdef"}
489 self.assertEqual(len(l.search()), 0)
492 self.assertEqual(len(l.search()), 1)
494 l.delete(ldb.Dn(l, "dc=foo6"))
496 def test_add_dict_bytes_dn(self):
497 l = ldb.Ldb(self.url(), flags=self.flags())
498 m = {"dn": b"dc=foo6", "bla": b"bla",
499 "objectUUID": b"0123456789abcdef"}
500 self.assertEqual(len(l.search()), 0)
503 self.assertEqual(len(l.search()), 1)
505 l.delete(ldb.Dn(l, "dc=foo6"))
507 def test_rename(self):
508 l = ldb.Ldb(self.url(), flags=self.flags())
510 m.dn = ldb.Dn(l, "dc=foo7")
512 m["objectUUID"] = b"0123456789abcdef"
513 self.assertEqual(len(l.search()), 0)
516 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
517 self.assertEqual(len(l.search()), 1)
519 l.delete(ldb.Dn(l, "dc=bar"))
521 def test_rename_string_dns(self):
522 l = ldb.Ldb(self.url(), flags=self.flags())
524 m.dn = ldb.Dn(l, "dc=foo8")
526 m["objectUUID"] = b"0123456789abcdef"
527 self.assertEqual(len(l.search()), 0)
529 self.assertEqual(len(l.search()), 1)
531 l.rename("dc=foo8", "dc=bar")
532 self.assertEqual(len(l.search()), 1)
534 l.delete(ldb.Dn(l, "dc=bar"))
536 def test_rename_bad_string_dns(self):
537 l = ldb.Ldb(self.url(), flags=self.flags())
539 m.dn = ldb.Dn(l, "dc=foo8")
541 m["objectUUID"] = b"0123456789abcdef"
542 self.assertEqual(len(l.search()), 0)
544 self.assertEqual(len(l.search()), 1)
545 self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
546 self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
547 l.delete(ldb.Dn(l, "dc=foo8"))
549 def test_empty_dn(self):
550 l = ldb.Ldb(self.url(), flags=self.flags())
551 self.assertEqual(0, len(l.search()))
553 m.dn = ldb.Dn(l, "dc=empty")
554 m["objectUUID"] = b"0123456789abcdef"
557 self.assertEqual(1, len(rm))
558 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
562 self.assertEqual(1, len(rm))
563 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
565 rm = l.search(m.dn, attrs=["blah"])
566 self.assertEqual(1, len(rm))
567 self.assertEqual(0, len(rm[0]))
569 def test_modify_delete(self):
570 l = ldb.Ldb(self.url(), flags=self.flags())
572 m.dn = ldb.Dn(l, "dc=modifydelete")
574 m["objectUUID"] = b"0123456789abcdef"
576 rm = l.search(m.dn)[0]
577 self.assertEqual([b"1234"], list(rm["bla"]))
580 m.dn = ldb.Dn(l, "dc=modifydelete")
581 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
582 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
585 self.assertEqual(1, len(rm))
586 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
588 rm = l.search(m.dn, attrs=["bla"])
589 self.assertEqual(1, len(rm))
590 self.assertEqual(0, len(rm[0]))
592 l.delete(ldb.Dn(l, "dc=modifydelete"))
594 def test_modify_delete_text(self):
595 l = ldb.Ldb(self.url(), flags=self.flags())
597 m.dn = ldb.Dn(l, "dc=modifydelete")
598 m.text["bla"] = ["1234"]
599 m["objectUUID"] = b"0123456789abcdef"
601 rm = l.search(m.dn)[0]
602 self.assertEqual(["1234"], list(rm.text["bla"]))
605 m.dn = ldb.Dn(l, "dc=modifydelete")
606 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
607 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
610 self.assertEqual(1, len(rm))
611 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
613 rm = l.search(m.dn, attrs=["bla"])
614 self.assertEqual(1, len(rm))
615 self.assertEqual(0, len(rm[0]))
617 l.delete(ldb.Dn(l, "dc=modifydelete"))
619 def test_modify_add(self):
620 l = ldb.Ldb(self.url(), flags=self.flags())
622 m.dn = ldb.Dn(l, "dc=add")
624 m["objectUUID"] = b"0123456789abcdef"
628 m.dn = ldb.Dn(l, "dc=add")
629 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
630 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
632 rm = l.search(m.dn)[0]
633 self.assertEqual(3, len(rm))
634 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
636 l.delete(ldb.Dn(l, "dc=add"))
638 def test_modify_add_text(self):
639 l = ldb.Ldb(self.url(), flags=self.flags())
641 m.dn = ldb.Dn(l, "dc=add")
642 m.text["bla"] = ["1234"]
643 m["objectUUID"] = b"0123456789abcdef"
647 m.dn = ldb.Dn(l, "dc=add")
648 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
649 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
651 rm = l.search(m.dn)[0]
652 self.assertEqual(3, len(rm))
653 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
655 l.delete(ldb.Dn(l, "dc=add"))
657 def test_modify_replace(self):
658 l = ldb.Ldb(self.url(), flags=self.flags())
660 m.dn = ldb.Dn(l, "dc=modify2")
661 m["bla"] = [b"1234", b"456"]
662 m["objectUUID"] = b"0123456789abcdef"
666 m.dn = ldb.Dn(l, "dc=modify2")
667 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
668 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
670 rm = l.search(m.dn)[0]
671 self.assertEqual(3, len(rm))
672 self.assertEqual([b"789"], list(rm["bla"]))
673 rm = l.search(m.dn, attrs=["bla"])[0]
674 self.assertEqual(1, len(rm))
676 l.delete(ldb.Dn(l, "dc=modify2"))
678 def test_modify_replace_text(self):
679 l = ldb.Ldb(self.url(), flags=self.flags())
681 m.dn = ldb.Dn(l, "dc=modify2")
682 m.text["bla"] = ["1234", "456"]
683 m["objectUUID"] = b"0123456789abcdef"
687 m.dn = ldb.Dn(l, "dc=modify2")
688 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
689 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
691 rm = l.search(m.dn)[0]
692 self.assertEqual(3, len(rm))
693 self.assertEqual(["789"], list(rm.text["bla"]))
694 rm = l.search(m.dn, attrs=["bla"])[0]
695 self.assertEqual(1, len(rm))
697 l.delete(ldb.Dn(l, "dc=modify2"))
699 def test_modify_flags_change(self):
700 l = ldb.Ldb(self.url(), flags=self.flags())
702 m.dn = ldb.Dn(l, "dc=add")
704 m["objectUUID"] = b"0123456789abcdef"
708 m.dn = ldb.Dn(l, "dc=add")
709 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
710 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
712 rm = l.search(m.dn)[0]
713 self.assertEqual(3, len(rm))
714 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
716 # Now create another modify, but switch the flags before we do it
717 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
718 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
720 rm = l.search(m.dn, attrs=["bla"])[0]
721 self.assertEqual(1, len(rm))
722 self.assertEqual([b"1234"], list(rm["bla"]))
724 l.delete(ldb.Dn(l, "dc=add"))
726 def test_modify_flags_change_text(self):
727 l = ldb.Ldb(self.url(), flags=self.flags())
729 m.dn = ldb.Dn(l, "dc=add")
730 m.text["bla"] = ["1234"]
731 m["objectUUID"] = b"0123456789abcdef"
735 m.dn = ldb.Dn(l, "dc=add")
736 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
737 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
739 rm = l.search(m.dn)[0]
740 self.assertEqual(3, len(rm))
741 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
743 # Now create another modify, but switch the flags before we do it
744 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
745 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
747 rm = l.search(m.dn, attrs=["bla"])[0]
748 self.assertEqual(1, len(rm))
749 self.assertEqual(["1234"], list(rm.text["bla"]))
751 l.delete(ldb.Dn(l, "dc=add"))
753 def test_transaction_commit(self):
754 l = ldb.Ldb(self.url(), flags=self.flags())
755 l.transaction_start()
756 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
758 m["objectUUID"] = b"0123456789abcdef"
760 l.transaction_commit()
763 def test_transaction_cancel(self):
764 l = ldb.Ldb(self.url(), flags=self.flags())
765 l.transaction_start()
766 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
768 m["objectUUID"] = b"0123456789abcdee"
770 l.transaction_cancel()
771 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
773 def test_set_debug(self):
774 def my_report_fn(level, text):
776 l = ldb.Ldb(self.url(), flags=self.flags())
777 l.set_debug(my_report_fn)
779 def test_zero_byte_string(self):
780 """Testing we do not get trapped in the \0 byte in a property string."""
781 l = ldb.Ldb(self.url(), flags=self.flags())
784 "objectclass": b"user",
785 "cN": b"LDAPtestUSER",
786 "givenname": b"ldap",
787 "displayname": b"foo\0bar",
788 "objectUUID": b"0123456789abcdef"
790 res = l.search(expression="(dn=dc=somedn)")
791 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
793 def test_no_crash_broken_expr(self):
794 l = ldb.Ldb(self.url(), flags=self.flags())
795 self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
797 # Run the SimpleLdb tests against an lmdb backend
800 class SimpleLdbLmdb(SimpleLdb):
803 if os.environ.get('HAVE_LMDB', '1') == '0':
804 self.skipTest("No lmdb backend")
805 self.prefix = MDB_PREFIX
806 self.index = MDB_INDEX_OBJ
807 super(SimpleLdbLmdb, self).setUp()
810 super(SimpleLdbLmdb, self).tearDown()
813 class SimpleLdbNoLmdb(LdbBaseTest):
816 if os.environ.get('HAVE_LMDB', '1') != '0':
817 self.skipTest("lmdb backend enabled")
818 self.prefix = MDB_PREFIX
819 self.index = MDB_INDEX_OBJ
820 super(SimpleLdbNoLmdb, self).setUp()
823 super(SimpleLdbNoLmdb, self).tearDown()
825 def test_lmdb_disabled(self):
826 self.testdir = tempdir()
827 self.filename = os.path.join(self.testdir, "test.ldb")
829 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
830 self.fail("Should have failed on missing LMDB")
831 except ldb.LdbError as err:
833 self.assertEqual(enum, ldb.ERR_OTHER)
836 class SearchTests(LdbBaseTest):
838 shutil.rmtree(self.testdir)
839 super(SearchTests, self).tearDown()
841 # Ensure the LDB is closed now, so we close the FD
845 super(SearchTests, self).setUp()
846 self.testdir = tempdir()
847 self.filename = os.path.join(self.testdir, "search_test.ldb")
848 options = ["modules:rdn_name"]
849 if hasattr(self, 'IDXCHECK'):
850 options.append("disable_full_db_scan_for_self_test:1")
851 self.l = ldb.Ldb(self.url(),
855 self.l.add(self.index)
856 except AttributeError:
859 self.l.add({"dn": "@ATTRIBUTES",
860 "DC": "CASE_INSENSITIVE"})
862 # Note that we can't use the name objectGUID here, as we
863 # want to stay clear of the objectGUID handler in LDB and
864 # instead use just the 16 bytes raw, which we just keep
865 # to printable chars here for ease of handling.
867 self.l.add({"dn": "DC=ORG",
869 "objectUUID": b"0000000000abcdef"})
870 self.l.add({"dn": "DC=EXAMPLE,DC=ORG",
872 "objectUUID": b"0000000001abcdef"})
873 self.l.add({"dn": "OU=OU1,DC=EXAMPLE,DC=ORG",
876 "objectUUID": b"0023456789abcde3"})
877 self.l.add({"dn": "OU=OU2,DC=EXAMPLE,DC=ORG",
880 "objectUUID": b"0023456789abcde4"})
881 self.l.add({"dn": "OU=OU3,DC=EXAMPLE,DC=ORG",
884 "objectUUID": b"0023456789abcde5"})
885 self.l.add({"dn": "OU=OU4,DC=EXAMPLE,DC=ORG",
888 "objectUUID": b"0023456789abcde6"})
889 self.l.add({"dn": "OU=OU5,DC=EXAMPLE,DC=ORG",
892 "objectUUID": b"0023456789abcde7"})
893 self.l.add({"dn": "OU=OU6,DC=EXAMPLE,DC=ORG",
896 "objectUUID": b"0023456789abcde8"})
897 self.l.add({"dn": "OU=OU7,DC=EXAMPLE,DC=ORG",
900 "objectUUID": b"0023456789abcde9"})
901 self.l.add({"dn": "OU=OU8,DC=EXAMPLE,DC=ORG",
904 "objectUUID": b"0023456789abcde0"})
905 self.l.add({"dn": "OU=OU9,DC=EXAMPLE,DC=ORG",
908 "objectUUID": b"0023456789abcdea"})
910 self.l.add({"dn": "DC=EXAMPLE,DC=COM",
912 "objectUUID": b"0000000011abcdef"})
914 self.l.add({"dn": "DC=EXAMPLE,DC=NET",
916 "objectUUID": b"0000000021abcdef"})
918 self.l.add({"dn": "OU=UNIQUE,DC=EXAMPLE,DC=NET",
919 "objectUUID": b"0000000022abcdef"})
921 self.l.add({"dn": "DC=SAMBA,DC=ORG",
922 "name": b"samba.org",
923 "objectUUID": b"0123456789abcdef"})
924 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
927 "objectUUID": b"0123456789abcde1"})
928 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
931 "objectUUID": b"0123456789abcde2"})
932 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
935 "objectUUID": b"0123456789abcde3"})
936 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
939 "objectUUID": b"0123456789abcde4"})
940 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
943 "objectUUID": b"0123456789abcde5"})
944 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
947 "objectUUID": b"0123456789abcde6"})
948 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
951 "objectUUID": b"0123456789abcde7"})
952 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
955 "objectUUID": b"0123456789abcde8"})
956 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
959 "objectUUID": b"0123456789abcde9"})
960 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
963 "objectUUID": b"0123456789abcde0"})
964 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
967 "objectUUID": b"0123456789abcdea"})
968 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
971 "objectUUID": b"0123456789abcdeb"})
972 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
975 "objectUUID": b"0123456789abcdec"})
976 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
979 "objectUUID": b"0123456789abcded"})
980 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
983 "objectUUID": b"0123456789abcdee"})
984 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
987 "objectUUID": b"0123456789abcd01"})
988 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
991 "objectUUID": b"0123456789abcd02"})
992 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
995 "objectUUID": b"0123456789abcd03"})
996 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
999 "objectUUID": b"0123456789abcd04"})
1000 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
1003 "objectUUID": b"0123456789abcd05"})
1004 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
1007 "objectUUID": b"0123456789abcd06"})
1008 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
1011 "objectUUID": b"0123456789abcd07"})
1012 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
1015 "objectUUID": b"0123456789abcd08"})
1016 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
1019 "objectUUID": b"0123456789abcd09"})
1021 def test_base(self):
1022 """Testing a search"""
1024 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1025 scope=ldb.SCOPE_BASE)
1026 self.assertEqual(len(res11), 1)
1028 def test_base_lower(self):
1029 """Testing a search"""
1031 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
1032 scope=ldb.SCOPE_BASE)
1033 self.assertEqual(len(res11), 1)
1035 def test_base_or(self):
1036 """Testing a search"""
1038 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1039 scope=ldb.SCOPE_BASE,
1040 expression="(|(ou=ou11)(ou=ou12))")
1041 self.assertEqual(len(res11), 1)
1043 def test_base_or2(self):
1044 """Testing a search"""
1046 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1047 scope=ldb.SCOPE_BASE,
1048 expression="(|(x=y)(y=b))")
1049 self.assertEqual(len(res11), 1)
1051 def test_base_and(self):
1052 """Testing a search"""
1054 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1055 scope=ldb.SCOPE_BASE,
1056 expression="(&(ou=ou11)(ou=ou12))")
1057 self.assertEqual(len(res11), 0)
1059 def test_base_and2(self):
1060 """Testing a search"""
1062 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1063 scope=ldb.SCOPE_BASE,
1064 expression="(&(x=y)(y=a))")
1065 self.assertEqual(len(res11), 1)
1067 def test_base_false(self):
1068 """Testing a search"""
1070 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1071 scope=ldb.SCOPE_BASE,
1072 expression="(|(ou=ou13)(ou=ou12))")
1073 self.assertEqual(len(res11), 0)
1075 def test_check_base_false(self):
1076 """Testing a search"""
1077 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1078 scope=ldb.SCOPE_BASE,
1079 expression="(|(ou=ou13)(ou=ou12))")
1080 self.assertEqual(len(res11), 0)
1082 def test_check_base_error(self):
1083 """Testing a search"""
1084 checkbaseonsearch = {"dn": "@OPTIONS",
1085 "checkBaseOnSearch": b"TRUE"}
1087 self.l.add(checkbaseonsearch)
1088 except ldb.LdbError as err:
1090 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1091 m = ldb.Message.from_dict(self.l,
1096 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
1097 scope=ldb.SCOPE_BASE,
1098 expression="(|(ou=ou13)(ou=ou12))")
1099 self.fail("Should have failed on missing base")
1100 except ldb.LdbError as err:
1102 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1104 def test_subtree(self):
1105 """Testing a search"""
1108 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1109 scope=ldb.SCOPE_SUBTREE)
1110 if hasattr(self, 'IDXCHECK'):
1112 except ldb.LdbError as err:
1115 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1116 self.assertIn(estr, "ldb FULL SEARCH disabled")
1118 self.assertEqual(len(res11), 25)
1120 def test_subtree2(self):
1121 """Testing a search"""
1124 res11 = self.l.search(base="DC=ORG",
1125 scope=ldb.SCOPE_SUBTREE)
1126 if hasattr(self, 'IDXCHECK'):
1128 except ldb.LdbError as err:
1131 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1132 self.assertIn(estr, "ldb FULL SEARCH disabled")
1134 self.assertEqual(len(res11), 36)
1136 def test_subtree_and(self):
1137 """Testing a search"""
1139 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1140 scope=ldb.SCOPE_SUBTREE,
1141 expression="(&(ou=ou11)(ou=ou12))")
1142 self.assertEqual(len(res11), 0)
1144 def test_subtree_and2(self):
1145 """Testing a search"""
1147 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1148 scope=ldb.SCOPE_SUBTREE,
1149 expression="(&(x=y)(|(y=b)(y=c)))")
1150 self.assertEqual(len(res11), 1)
1152 def test_subtree_and2_lower(self):
1153 """Testing a search"""
1155 res11 = self.l.search(base="DC=samba,DC=org",
1156 scope=ldb.SCOPE_SUBTREE,
1157 expression="(&(x=y)(|(y=b)(y=c)))")
1158 self.assertEqual(len(res11), 1)
1160 def test_subtree_or(self):
1161 """Testing a search"""
1163 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1164 scope=ldb.SCOPE_SUBTREE,
1165 expression="(|(ou=ou11)(ou=ou12))")
1166 self.assertEqual(len(res11), 2)
1168 def test_subtree_or2(self):
1169 """Testing a search"""
1171 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1172 scope=ldb.SCOPE_SUBTREE,
1173 expression="(|(x=y)(y=b))")
1174 self.assertEqual(len(res11), 20)
1176 def test_subtree_or3(self):
1177 """Testing a search"""
1179 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1180 scope=ldb.SCOPE_SUBTREE,
1181 expression="(|(x=y)(y=b)(y=c))")
1182 self.assertEqual(len(res11), 22)
1184 def test_one_and(self):
1185 """Testing a search"""
1187 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1188 scope=ldb.SCOPE_ONELEVEL,
1189 expression="(&(ou=ou11)(ou=ou12))")
1190 self.assertEqual(len(res11), 0)
1192 def test_one_and2(self):
1193 """Testing a search"""
1195 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1196 scope=ldb.SCOPE_ONELEVEL,
1197 expression="(&(x=y)(y=b))")
1198 self.assertEqual(len(res11), 1)
1200 def test_one_or(self):
1201 """Testing a search"""
1203 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1204 scope=ldb.SCOPE_ONELEVEL,
1205 expression="(|(ou=ou11)(ou=ou12))")
1206 self.assertEqual(len(res11), 2)
1208 def test_one_or2(self):
1209 """Testing a search"""
1211 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1212 scope=ldb.SCOPE_ONELEVEL,
1213 expression="(|(x=y)(y=b))")
1214 self.assertEqual(len(res11), 20)
1216 def test_one_or2_lower(self):
1217 """Testing a search"""
1219 res11 = self.l.search(base="DC=samba,DC=org",
1220 scope=ldb.SCOPE_ONELEVEL,
1221 expression="(|(x=y)(y=b))")
1222 self.assertEqual(len(res11), 20)
1224 def test_one_unindexable(self):
1225 """Testing a search"""
1228 res11 = self.l.search(base="DC=samba,DC=org",
1229 scope=ldb.SCOPE_ONELEVEL,
1230 expression="(y=b*)")
1231 if hasattr(self, 'IDX') and \
1232 not hasattr(self, 'IDXONE') and \
1233 hasattr(self, 'IDXCHECK'):
1234 self.fail("Should have failed as un-indexed search")
1236 self.assertEqual(len(res11), 9)
1238 except ldb.LdbError as err:
1241 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1242 self.assertIn(estr, "ldb FULL SEARCH disabled")
1244 def test_one_unindexable_presence(self):
1245 """Testing a search"""
1248 res11 = self.l.search(base="DC=samba,DC=org",
1249 scope=ldb.SCOPE_ONELEVEL,
1251 if hasattr(self, 'IDX') and \
1252 not hasattr(self, 'IDXONE') and \
1253 hasattr(self, 'IDXCHECK'):
1254 self.fail("Should have failed as un-indexed search")
1256 self.assertEqual(len(res11), 24)
1258 except ldb.LdbError as err:
1261 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1262 self.assertIn(estr, "ldb FULL SEARCH disabled")
1264 def test_subtree_and_or(self):
1265 """Testing a search"""
1267 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1268 scope=ldb.SCOPE_SUBTREE,
1269 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1270 self.assertEqual(len(res11), 0)
1272 def test_subtree_and_or2(self):
1273 """Testing a search"""
1275 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1276 scope=ldb.SCOPE_SUBTREE,
1277 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1278 self.assertEqual(len(res11), 0)
1280 def test_subtree_and_or3(self):
1281 """Testing a search"""
1283 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1284 scope=ldb.SCOPE_SUBTREE,
1285 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1286 self.assertEqual(len(res11), 2)
1288 def test_subtree_and_or4(self):
1289 """Testing a search"""
1291 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1292 scope=ldb.SCOPE_SUBTREE,
1293 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1294 self.assertEqual(len(res11), 2)
1296 def test_subtree_and_or5(self):
1297 """Testing a search"""
1299 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1300 scope=ldb.SCOPE_SUBTREE,
1301 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1302 self.assertEqual(len(res11), 1)
1304 def test_subtree_or_and(self):
1305 """Testing a search"""
1307 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1308 scope=ldb.SCOPE_SUBTREE,
1309 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1310 self.assertEqual(len(res11), 10)
1312 def test_subtree_large_and_unique(self):
1313 """Testing a search"""
1315 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1316 scope=ldb.SCOPE_SUBTREE,
1317 expression="(&(ou=ou10)(y=a))")
1318 self.assertEqual(len(res11), 1)
1320 def test_subtree_unique(self):
1321 """Testing a search"""
1323 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1324 scope=ldb.SCOPE_SUBTREE,
1325 expression="(ou=ou10)")
1326 self.assertEqual(len(res11), 1)
1328 def test_subtree_unique_elsewhere(self):
1329 """Testing a search"""
1331 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1332 scope=ldb.SCOPE_SUBTREE,
1333 expression="(ou=ou10)")
1334 self.assertEqual(len(res11), 0)
1336 def test_subtree_unique_elsewhere2(self):
1337 """Testing a search"""
1339 res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1340 scope=ldb.SCOPE_SUBTREE,
1341 expression="(ou=unique)")
1342 self.assertEqual(len(res11), 1)
1344 def test_subtree_uni123_elsewhere(self):
1345 """Testing a search, where the search term contains a (normal ASCII)
1346 dotted-i, that will be upper-cased to 'Ä°', U+0130, LATIN
1347 CAPITAL LETTER I WITH DOT ABOVE in certain locales including
1348 tr_TR in which this test is sometimes run.
1350 The search term should fail because the ou does not exist, but
1351 we used to get it wrong in unindexed searches which stopped
1352 comparing at the i, ignoring the rest of the string, which is
1353 not the same as the existing ou ('123' != 'que').
1355 res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1356 scope=ldb.SCOPE_SUBTREE,
1357 expression="(ou=uni123)")
1358 self.assertEqual(len(res11), 0)
1360 def test_subtree_unique_elsewhere3(self):
1361 """Testing a search"""
1363 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1364 scope=ldb.SCOPE_SUBTREE,
1365 expression="(ou=unique)")
1366 self.assertEqual(len(res11), 0)
1368 def test_subtree_unique_elsewhere4(self):
1369 """Testing a search"""
1371 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1372 scope=ldb.SCOPE_SUBTREE,
1373 expression="(ou=unique)")
1374 self.assertEqual(len(res11), 0)
1376 def test_subtree_unique_elsewhere5(self):
1377 """Testing a search"""
1379 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1380 scope=ldb.SCOPE_SUBTREE,
1381 expression="(ou=unique)")
1382 self.assertEqual(len(res11), 0)
1384 def test_subtree_unique_elsewhere6(self):
1385 """Testing a search"""
1387 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1388 scope=ldb.SCOPE_SUBTREE,
1389 expression="(ou=unique)")
1390 self.assertEqual(len(res11), 0)
1392 def test_subtree_unique_elsewhere7(self):
1393 """Testing a search"""
1395 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1396 scope=ldb.SCOPE_SUBTREE,
1397 expression="(ou=ou10)")
1398 self.assertEqual(len(res11), 0)
1400 def test_subtree_unique_here(self):
1401 """Testing a search"""
1403 res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1404 scope=ldb.SCOPE_SUBTREE,
1405 expression="(ou=unique)")
1406 self.assertEqual(len(res11), 1)
1408 def test_subtree_and_none(self):
1409 """Testing a search"""
1411 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1412 scope=ldb.SCOPE_SUBTREE,
1413 expression="(&(ou=ouX)(y=a))")
1414 self.assertEqual(len(res11), 0)
1416 def test_subtree_and_idx_record(self):
1417 """Testing a search against the index record"""
1419 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1420 scope=ldb.SCOPE_SUBTREE,
1421 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1422 self.assertEqual(len(res11), 0)
1424 def test_subtree_and_idxone_record(self):
1425 """Testing a search against the index record"""
1427 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1428 scope=ldb.SCOPE_SUBTREE,
1429 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1430 self.assertEqual(len(res11), 0)
1432 def test_onelevel(self):
1433 """Testing a search"""
1436 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1437 scope=ldb.SCOPE_ONELEVEL)
1438 if hasattr(self, 'IDXCHECK') \
1439 and not hasattr(self, 'IDXONE'):
1441 except ldb.LdbError as err:
1444 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1445 self.assertIn(estr, "ldb FULL SEARCH disabled")
1447 self.assertEqual(len(res11), 24)
1449 def test_onelevel2(self):
1450 """Testing a search"""
1453 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1454 scope=ldb.SCOPE_ONELEVEL)
1455 if hasattr(self, 'IDXCHECK') \
1456 and not hasattr(self, 'IDXONE'):
1459 except ldb.LdbError as err:
1462 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1463 self.assertIn(estr, "ldb FULL SEARCH disabled")
1465 self.assertEqual(len(res11), 9)
1467 def test_onelevel_and_or(self):
1468 """Testing a search"""
1470 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1471 scope=ldb.SCOPE_ONELEVEL,
1472 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1473 self.assertEqual(len(res11), 0)
1475 def test_onelevel_and_or2(self):
1476 """Testing a search"""
1478 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1479 scope=ldb.SCOPE_ONELEVEL,
1480 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1481 self.assertEqual(len(res11), 0)
1483 def test_onelevel_and_or3(self):
1484 """Testing a search"""
1486 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1487 scope=ldb.SCOPE_ONELEVEL,
1488 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1489 self.assertEqual(len(res11), 2)
1491 def test_onelevel_and_or4(self):
1492 """Testing a search"""
1494 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1495 scope=ldb.SCOPE_ONELEVEL,
1496 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1497 self.assertEqual(len(res11), 2)
1499 def test_onelevel_and_or5(self):
1500 """Testing a search"""
1502 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1503 scope=ldb.SCOPE_ONELEVEL,
1504 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1505 self.assertEqual(len(res11), 1)
1507 def test_onelevel_or_and(self):
1508 """Testing a search"""
1510 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1511 scope=ldb.SCOPE_ONELEVEL,
1512 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1513 self.assertEqual(len(res11), 10)
1515 def test_onelevel_large_and_unique(self):
1516 """Testing a search"""
1518 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1519 scope=ldb.SCOPE_ONELEVEL,
1520 expression="(&(ou=ou10)(y=a))")
1521 self.assertEqual(len(res11), 1)
1523 def test_onelevel_unique(self):
1524 """Testing a search"""
1526 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1527 scope=ldb.SCOPE_ONELEVEL,
1528 expression="(ou=ou10)")
1529 self.assertEqual(len(res11), 1)
1531 def test_onelevel_unique_elsewhere(self):
1532 """Testing a search"""
1534 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1535 scope=ldb.SCOPE_ONELEVEL,
1536 expression="(ou=ou10)")
1537 self.assertEqual(len(res11), 0)
1539 def test_onelevel_unique_elsewhere2(self):
1540 """Testing a search (showing that onelevel is not subtree)"""
1542 res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1543 scope=ldb.SCOPE_ONELEVEL,
1544 expression="(ou=unique)")
1545 self.assertEqual(len(res11), 1)
1547 def test_onelevel_unique_elsewhere3(self):
1548 """Testing a search (showing that onelevel is not subtree)"""
1550 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1551 scope=ldb.SCOPE_ONELEVEL,
1552 expression="(ou=unique)")
1553 self.assertEqual(len(res11), 0)
1555 def test_onelevel_unique_elsewhere4(self):
1556 """Testing a search (showing that onelevel is not subtree)"""
1558 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1559 scope=ldb.SCOPE_ONELEVEL,
1560 expression="(ou=unique)")
1561 self.assertEqual(len(res11), 0)
1563 def test_onelevel_unique_elsewhere5(self):
1564 """Testing a search (showing that onelevel is not subtree)"""
1566 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1567 scope=ldb.SCOPE_ONELEVEL,
1568 expression="(ou=unique)")
1569 self.assertEqual(len(res11), 0)
1571 def test_onelevel_unique_elsewhere6(self):
1572 """Testing a search"""
1574 res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1575 scope=ldb.SCOPE_ONELEVEL,
1576 expression="(ou=ou10)")
1577 self.assertEqual(len(res11), 0)
1579 def test_onelevel_unique_here(self):
1580 """Testing a search"""
1582 res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1583 scope=ldb.SCOPE_ONELEVEL,
1584 expression="(ou=unique)")
1585 self.assertEqual(len(res11), 0)
1587 def test_onelevel_and_none(self):
1588 """Testing a search"""
1590 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1591 scope=ldb.SCOPE_ONELEVEL,
1592 expression="(&(ou=ouX)(y=a))")
1593 self.assertEqual(len(res11), 0)
1595 def test_onelevel_and_idx_record(self):
1596 """Testing a search against the index record"""
1598 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1599 scope=ldb.SCOPE_ONELEVEL,
1600 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1601 self.assertEqual(len(res11), 0)
1603 def test_onelevel_and_idxone_record(self):
1604 """Testing a search against the index record"""
1606 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1607 scope=ldb.SCOPE_ONELEVEL,
1608 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1609 self.assertEqual(len(res11), 0)
1611 def test_subtree_unindexable(self):
1612 """Testing a search"""
1615 res11 = self.l.search(base="DC=samba,DC=org",
1616 scope=ldb.SCOPE_SUBTREE,
1617 expression="(y=b*)")
1618 if hasattr(self, 'IDX') and \
1619 hasattr(self, 'IDXCHECK'):
1620 self.fail("Should have failed as un-indexed search")
1622 self.assertEqual(len(res11), 9)
1624 except ldb.LdbError as err:
1627 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1628 self.assertIn(estr, "ldb FULL SEARCH disabled")
1630 def test_onelevel_only_and_or(self):
1631 """Testing a search (showing that onelevel is not subtree)"""
1633 res11 = self.l.search(base="DC=ORG",
1634 scope=ldb.SCOPE_ONELEVEL,
1635 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1636 self.assertEqual(len(res11), 0)
1638 def test_onelevel_only_and_or2(self):
1639 """Testing a search (showing that onelevel is not subtree)"""
1641 res11 = self.l.search(base="DC=ORG",
1642 scope=ldb.SCOPE_ONELEVEL,
1643 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1644 self.assertEqual(len(res11), 0)
1646 def test_onelevel_only_and_or3(self):
1647 """Testing a search (showing that onelevel is not subtree)"""
1649 res11 = self.l.search(base="DC=ORG",
1650 scope=ldb.SCOPE_ONELEVEL,
1651 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1652 self.assertEqual(len(res11), 0)
1654 def test_onelevel_only_and_or4(self):
1655 """Testing a search (showing that onelevel is not subtree)"""
1657 res11 = self.l.search(base="DC=ORG",
1658 scope=ldb.SCOPE_ONELEVEL,
1659 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1660 self.assertEqual(len(res11), 0)
1662 def test_onelevel_only_and_or5(self):
1663 """Testing a search (showing that onelevel is not subtree)"""
1665 res11 = self.l.search(base="DC=ORG",
1666 scope=ldb.SCOPE_ONELEVEL,
1667 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1668 self.assertEqual(len(res11), 0)
1670 def test_onelevel_only_or_and(self):
1671 """Testing a search (showing that onelevel is not subtree)"""
1673 res11 = self.l.search(base="DC=ORG",
1674 scope=ldb.SCOPE_ONELEVEL,
1675 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1676 self.assertEqual(len(res11), 0)
1678 def test_onelevel_only_large_and_unique(self):
1679 """Testing a search (showing that onelevel is not subtree)"""
1681 res11 = self.l.search(base="DC=ORG",
1682 scope=ldb.SCOPE_ONELEVEL,
1683 expression="(&(ou=ou10)(y=a))")
1684 self.assertEqual(len(res11), 0)
1686 def test_onelevel_only_unique(self):
1687 """Testing a search (showing that onelevel is not subtree)"""
1689 res11 = self.l.search(base="DC=ORG",
1690 scope=ldb.SCOPE_ONELEVEL,
1691 expression="(ou=ou10)")
1692 self.assertEqual(len(res11), 0)
1694 def test_onelevel_only_unique2(self):
1695 """Testing a search"""
1697 res11 = self.l.search(base="DC=ORG",
1698 scope=ldb.SCOPE_ONELEVEL,
1699 expression="(ou=unique)")
1700 self.assertEqual(len(res11), 0)
1702 def test_onelevel_only_and_none(self):
1703 """Testing a search (showing that onelevel is not subtree)"""
1705 res11 = self.l.search(base="DC=ORG",
1706 scope=ldb.SCOPE_ONELEVEL,
1707 expression="(&(ou=ouX)(y=a))")
1708 self.assertEqual(len(res11), 0)
1710 def test_onelevel_small_and_or(self):
1711 """Testing a search (showing that onelevel is not subtree)"""
1713 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1714 scope=ldb.SCOPE_ONELEVEL,
1715 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1716 self.assertEqual(len(res11), 0)
1718 def test_onelevel_small_and_or2(self):
1719 """Testing a search (showing that onelevel is not subtree)"""
1721 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1722 scope=ldb.SCOPE_ONELEVEL,
1723 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1724 self.assertEqual(len(res11), 0)
1726 def test_onelevel_small_and_or3(self):
1727 """Testing a search (showing that onelevel is not subtree)"""
1729 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1730 scope=ldb.SCOPE_ONELEVEL,
1731 expression="(&(|(ou=ou1)(ou=ou2))(|(x=y)(y=b)(y=c)))")
1732 self.assertEqual(len(res11), 2)
1734 def test_onelevel_small_and_or4(self):
1735 """Testing a search (showing that onelevel is not subtree)"""
1737 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1738 scope=ldb.SCOPE_ONELEVEL,
1739 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou1)(ou=ou2)))")
1740 self.assertEqual(len(res11), 2)
1742 def test_onelevel_small_and_or5(self):
1743 """Testing a search (showing that onelevel is not subtree)"""
1745 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1746 scope=ldb.SCOPE_ONELEVEL,
1747 expression="(&(|(x=y)(y=b)(y=c))(ou=ou1))")
1748 self.assertEqual(len(res11), 1)
1750 def test_onelevel_small_or_and(self):
1751 """Testing a search (showing that onelevel is not subtree)"""
1753 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1754 scope=ldb.SCOPE_ONELEVEL,
1755 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1756 self.assertEqual(len(res11), 2)
1758 def test_onelevel_small_large_and_unique(self):
1759 """Testing a search (showing that onelevel is not subtree)"""
1761 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1762 scope=ldb.SCOPE_ONELEVEL,
1763 expression="(&(ou=ou9)(y=a))")
1764 self.assertEqual(len(res11), 1)
1766 def test_onelevel_small_unique_elsewhere(self):
1767 """Testing a search (showing that onelevel is not subtree)"""
1769 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1770 scope=ldb.SCOPE_ONELEVEL,
1771 expression="(ou=ou10)")
1772 self.assertEqual(len(res11), 0)
1774 def test_onelevel_small_and_none(self):
1775 """Testing a search (showing that onelevel is not subtree)"""
1777 res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1778 scope=ldb.SCOPE_ONELEVEL,
1779 expression="(&(ou=ouX)(y=a))")
1780 self.assertEqual(len(res11), 0)
1782 def test_subtree_unindexable_presence(self):
1783 """Testing a search"""
1786 res11 = self.l.search(base="DC=samba,DC=org",
1787 scope=ldb.SCOPE_SUBTREE,
1789 if hasattr(self, 'IDX') and \
1790 hasattr(self, 'IDXCHECK'):
1791 self.fail("Should have failed as un-indexed search")
1793 self.assertEqual(len(res11), 24)
1795 except ldb.LdbError as err:
1798 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1799 self.assertIn(estr, "ldb FULL SEARCH disabled")
1801 def test_dn_filter_one(self):
1802 """Testing that a dn= filter succeeds
1803 (or fails with disallowDNFilter
1804 set and IDXGUID or (IDX and not IDXONE) mode)
1805 when the scope is SCOPE_ONELEVEL.
1807 This should be made more consistent, but for now lock in
1812 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1813 scope=ldb.SCOPE_ONELEVEL,
1814 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1815 if hasattr(self, 'disallowDNFilter') and \
1816 hasattr(self, 'IDX') and \
1817 (hasattr(self, 'IDXGUID') or
1818 ((not hasattr(self, 'IDXONE') and hasattr(self, 'IDX')))):
1819 self.assertEqual(len(res11), 0)
1821 self.assertEqual(len(res11), 1)
1823 def test_dn_filter_subtree(self):
1824 """Testing that a dn= filter succeeds
1825 (or fails with disallowDNFilter set)
1826 when the scope is SCOPE_SUBTREE"""
1828 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1829 scope=ldb.SCOPE_SUBTREE,
1830 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1831 if hasattr(self, 'disallowDNFilter') \
1832 and hasattr(self, 'IDX'):
1833 self.assertEqual(len(res11), 0)
1835 self.assertEqual(len(res11), 1)
1837 def test_dn_filter_base(self):
1838 """Testing that (incorrectly) a dn= filter works
1839 when the scope is SCOPE_BASE"""
1841 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1842 scope=ldb.SCOPE_BASE,
1843 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1845 # At some point we should fix this, but it isn't trivial
1846 self.assertEqual(len(res11), 1)
1848 def test_distinguishedName_filter_one(self):
1849 """Testing that a distinguishedName= filter succeeds
1850 when the scope is SCOPE_ONELEVEL.
1852 This should be made more consistent, but for now lock in
1857 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1858 scope=ldb.SCOPE_ONELEVEL,
1859 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1860 self.assertEqual(len(res11), 1)
1862 def test_distinguishedName_filter_subtree(self):
1863 """Testing that a distinguishedName= filter succeeds
1864 when the scope is SCOPE_SUBTREE"""
1866 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1867 scope=ldb.SCOPE_SUBTREE,
1868 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1869 self.assertEqual(len(res11), 1)
1871 def test_distinguishedName_filter_base(self):
1872 """Testing that (incorrectly) a distinguishedName= filter works
1873 when the scope is SCOPE_BASE"""
1875 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1876 scope=ldb.SCOPE_BASE,
1877 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1879 # At some point we should fix this, but it isn't trivial
1880 self.assertEqual(len(res11), 1)
1882 def test_bad_dn_filter_base(self):
1883 """Testing that a dn= filter on an invalid DN works
1884 when the scope is SCOPE_BASE but
1885 returns zero results"""
1887 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1888 scope=ldb.SCOPE_BASE,
1889 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1891 # At some point we should fix this, but it isn't trivial
1892 self.assertEqual(len(res11), 0)
1895 def test_bad_dn_filter_one(self):
1896 """Testing that a dn= filter succeeds but returns zero
1897 results when the DN is not valid on a SCOPE_ONELEVEL search
1901 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1902 scope=ldb.SCOPE_ONELEVEL,
1903 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1904 self.assertEqual(len(res11), 0)
1906 def test_bad_dn_filter_subtree(self):
1907 """Testing that a dn= filter succeeds but returns zero
1908 results when the DN is not valid on a SCOPE_SUBTREE search
1912 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1913 scope=ldb.SCOPE_SUBTREE,
1914 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1915 self.assertEqual(len(res11), 0)
1917 def test_bad_distinguishedName_filter_base(self):
1918 """Testing that a distinguishedName= filter on an invalid DN works
1919 when the scope is SCOPE_BASE but
1920 returns zero results"""
1922 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1923 scope=ldb.SCOPE_BASE,
1924 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1926 # At some point we should fix this, but it isn't trivial
1927 self.assertEqual(len(res11), 0)
1930 def test_bad_distinguishedName_filter_one(self):
1931 """Testing that a distinguishedName= filter succeeds but returns zero
1932 results when the DN is not valid on a SCOPE_ONELEVEL search
1936 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1937 scope=ldb.SCOPE_ONELEVEL,
1938 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1939 self.assertEqual(len(res11), 0)
1941 def test_bad_distinguishedName_filter_subtree(self):
1942 """Testing that a distinguishedName= filter succeeds but returns zero
1943 results when the DN is not valid on a SCOPE_SUBTREE search
1947 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1948 scope=ldb.SCOPE_SUBTREE,
1949 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1950 self.assertEqual(len(res11), 0)
1952 def test_bad_dn_search_base(self):
1953 """Testing with a bad base DN (SCOPE_BASE)"""
1956 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1957 scope=ldb.SCOPE_BASE)
1958 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1959 except ldb.LdbError as err:
1961 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1964 def test_bad_dn_search_one(self):
1965 """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1968 res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1969 scope=ldb.SCOPE_ONELEVEL)
1970 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1971 except ldb.LdbError as err:
1973 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1975 def test_bad_dn_search_subtree(self):
1976 """Testing with a bad base DN (SCOPE_SUBTREE)"""
1979 res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1980 scope=ldb.SCOPE_SUBTREE)
1981 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1982 except ldb.LdbError as err:
1984 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1988 # Run the search tests against an lmdb backend
1989 class SearchTestsLmdb(SearchTests):
1992 if os.environ.get('HAVE_LMDB', '1') == '0':
1993 self.skipTest("No lmdb backend")
1994 self.prefix = MDB_PREFIX
1995 self.index = MDB_INDEX_OBJ
1996 super(SearchTestsLmdb, self).setUp()
1999 super(SearchTestsLmdb, self).tearDown()
2002 class IndexedSearchTests(SearchTests):
2003 """Test searches using the index, to ensure the index doesn't
2007 super(IndexedSearchTests, self).setUp()
2008 self.l.add({"dn": "@INDEXLIST",
2009 "@IDXATTR": [b"x", b"y", b"ou"]})
2013 class IndexedCheckSearchTests(IndexedSearchTests):
2014 """Test searches using the index, to ensure the index doesn't
2015 break things (full scan disabled)"""
2018 self.IDXCHECK = True
2019 super(IndexedCheckSearchTests, self).setUp()
2022 class IndexedSearchDnFilterTests(SearchTests):
2023 """Test searches using the index, to ensure the index doesn't
2027 super(IndexedSearchDnFilterTests, self).setUp()
2028 self.l.add({"dn": "@OPTIONS",
2029 "disallowDNFilter": "TRUE"})
2030 self.disallowDNFilter = True
2032 self.l.add({"dn": "@INDEXLIST",
2033 "@IDXATTR": [b"x", b"y", b"ou"]})
2037 class IndexedAndOneLevelSearchTests(SearchTests):
2038 """Test searches using the index including @IDXONE, to ensure
2039 the index doesn't break things"""
2042 super(IndexedAndOneLevelSearchTests, self).setUp()
2043 self.l.add({"dn": "@INDEXLIST",
2044 "@IDXATTR": [b"x", b"y", b"ou"],
2050 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
2051 """Test searches using the index including @IDXONE, to ensure
2052 the index doesn't break things (full scan disabled)"""
2055 self.IDXCHECK = True
2056 super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
2059 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
2060 """Test searches using the index including @IDXONE, to ensure
2061 the index doesn't break things"""
2064 super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
2065 self.l.add({"dn": "@OPTIONS",
2066 "disallowDNFilter": "TRUE",
2067 "checkBaseOnSearch": "TRUE"})
2068 self.disallowDNFilter = True
2069 self.checkBaseOnSearch = True
2071 self.l.add({"dn": "@INDEXLIST",
2072 "@IDXATTR": [b"x", b"y", b"ou"],
2078 class GUIDIndexedSearchTests(SearchTests):
2079 """Test searches using the index, to ensure the index doesn't
2083 self.index = {"dn": "@INDEXLIST",
2084 "@IDXATTR": [b"x", b"y", b"ou"],
2085 "@IDXGUID": [b"objectUUID"],
2086 "@IDX_DN_GUID": [b"GUID"]}
2087 super(GUIDIndexedSearchTests, self).setUp()
2092 class GUIDIndexedDNFilterSearchTests(SearchTests):
2093 """Test searches using the index, to ensure the index doesn't
2097 self.index = {"dn": "@INDEXLIST",
2098 "@IDXATTR": [b"x", b"y", b"ou"],
2099 "@IDXGUID": [b"objectUUID"],
2100 "@IDX_DN_GUID": [b"GUID"]}
2101 super(GUIDIndexedDNFilterSearchTests, self).setUp()
2102 self.l.add({"dn": "@OPTIONS",
2103 "disallowDNFilter": "TRUE",
2104 "checkBaseOnSearch": "TRUE"})
2105 self.disallowDNFilter = True
2106 self.checkBaseOnSearch = True
2111 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
2112 """Test searches using the index including @IDXONE, to ensure
2113 the index doesn't break things"""
2116 self.index = {"dn": "@INDEXLIST",
2117 "@IDXATTR": [b"x", b"y", b"ou"],
2119 "@IDXGUID": [b"objectUUID"],
2120 "@IDX_DN_GUID": [b"GUID"]}
2121 super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
2122 self.l.add({"dn": "@OPTIONS",
2123 "disallowDNFilter": "TRUE",
2124 "checkBaseOnSearch": "TRUE"})
2125 self.disallowDNFilter = True
2126 self.checkBaseOnSearch = True
2132 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
2135 if os.environ.get('HAVE_LMDB', '1') == '0':
2136 self.skipTest("No lmdb backend")
2137 self.prefix = MDB_PREFIX
2138 super(GUIDIndexedSearchTestsLmdb, self).setUp()
2141 super(GUIDIndexedSearchTestsLmdb, self).tearDown()
2144 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
2147 if os.environ.get('HAVE_LMDB', '1') == '0':
2148 self.skipTest("No lmdb backend")
2149 self.prefix = MDB_PREFIX
2150 super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
2153 super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
2156 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
2159 if os.environ.get('HAVE_LMDB', '1') == '0':
2160 self.skipTest("No lmdb backend")
2161 self.prefix = MDB_PREFIX
2162 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
2165 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
2168 class AddModifyTests(LdbBaseTest):
2170 shutil.rmtree(self.testdir)
2171 super(AddModifyTests, self).tearDown()
2173 # Ensure the LDB is closed now, so we close the FD
2177 super(AddModifyTests, self).setUp()
2178 self.testdir = tempdir()
2179 self.filename = os.path.join(self.testdir, "add_test.ldb")
2180 self.l = ldb.Ldb(self.url(),
2182 options=["modules:rdn_name"])
2184 self.l.add(self.index)
2185 except AttributeError:
2188 self.l.add({"dn": "DC=SAMBA,DC=ORG",
2189 "name": b"samba.org",
2190 "objectUUID": b"0123456789abcdef"})
2191 self.l.add({"dn": "@ATTRIBUTES",
2192 "objectUUID": "UNIQUE_INDEX"})
2194 def test_add_dup(self):
2195 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2198 "objectUUID": b"0123456789abcde1"})
2200 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2203 "objectUUID": b"0123456789abcde2"})
2204 self.fail("Should have failed adding duplicate entry")
2205 except ldb.LdbError as err:
2207 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2209 def test_add_bad(self):
2211 self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
2214 "objectUUID": b"0123456789abcde1"})
2215 self.fail("Should have failed adding entry with invalid DN")
2216 except ldb.LdbError as err:
2218 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2220 def test_add_del_add(self):
2221 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2224 "objectUUID": b"0123456789abcde1"})
2225 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
2226 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2229 "objectUUID": b"0123456789abcde2"})
2231 def test_add_move_add(self):
2232 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2235 "objectUUID": b"0123456789abcde1"})
2236 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2237 "OU=DUP2,DC=SAMBA,DC=ORG")
2238 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2241 "objectUUID": b"0123456789abcde2"})
2243 def test_add_move_fail_move_move(self):
2244 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2247 "objectUUID": b"0123456789abcde1"})
2248 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2251 "objectUUID": b"0123456789abcde2"})
2253 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2254 scope=ldb.SCOPE_SUBTREE,
2255 expression="(objectUUID=0123456789abcde1)")
2256 self.assertEqual(len(res2), 1)
2257 self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
2259 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2260 scope=ldb.SCOPE_SUBTREE,
2261 expression="(objectUUID=0123456789abcde2)")
2262 self.assertEqual(len(res3), 1)
2263 self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2266 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2267 "OU=DUP2,DC=SAMBA,DC=ORG")
2268 self.fail("Should have failed on duplicate DN")
2269 except ldb.LdbError as err:
2271 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2273 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2274 "OU=DUP3,DC=SAMBA,DC=ORG")
2276 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2277 "OU=DUP2,DC=SAMBA,DC=ORG")
2279 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2280 scope=ldb.SCOPE_SUBTREE,
2281 expression="(objectUUID=0123456789abcde1)")
2282 self.assertEqual(len(res2), 1)
2283 self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2285 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2286 scope=ldb.SCOPE_SUBTREE,
2287 expression="(objectUUID=0123456789abcde2)")
2288 self.assertEqual(len(res3), 1)
2289 self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
2291 def test_move_missing(self):
2293 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2294 "OU=DUP2,DC=SAMBA,DC=ORG")
2295 self.fail("Should have failed on missing")
2296 except ldb.LdbError as err:
2298 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2300 def test_move_missing2(self):
2301 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2304 "objectUUID": b"0123456789abcde2"})
2307 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2308 "OU=DUP2,DC=SAMBA,DC=ORG")
2309 self.fail("Should have failed on missing")
2310 except ldb.LdbError as err:
2312 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2314 def test_move_bad(self):
2315 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2318 "objectUUID": b"0123456789abcde2"})
2321 self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
2322 "OU=DUP2,DC=SAMBA,DC=ORG")
2323 self.fail("Should have failed on invalid DN")
2324 except ldb.LdbError as err:
2326 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2328 def test_move_bad2(self):
2329 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2332 "objectUUID": b"0123456789abcde2"})
2335 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2336 "OUXDUP2,DC=SAMBA,DC=ORG")
2337 self.fail("Should have failed on missing")
2338 except ldb.LdbError as err:
2340 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2342 def test_move_fail_move_add(self):
2343 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2346 "objectUUID": b"0123456789abcde1"})
2347 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2350 "objectUUID": b"0123456789abcde2"})
2352 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2353 "OU=DUP2,DC=SAMBA,DC=ORG")
2354 self.fail("Should have failed on duplicate DN")
2355 except ldb.LdbError as err:
2357 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2359 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2360 "OU=DUP3,DC=SAMBA,DC=ORG")
2362 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2365 "objectUUID": b"0123456789abcde3"})
2368 class AddModifyTestsLmdb(AddModifyTests):
2371 if os.environ.get('HAVE_LMDB', '1') == '0':
2372 self.skipTest("No lmdb backend")
2373 self.prefix = MDB_PREFIX
2374 self.index = MDB_INDEX_OBJ
2375 super(AddModifyTestsLmdb, self).setUp()
2378 super(AddModifyTestsLmdb, self).tearDown()
2381 class IndexedAddModifyTests(AddModifyTests):
2382 """Test searches using the index, to ensure the index doesn't
2386 if not hasattr(self, 'index'):
2387 self.index = {"dn": "@INDEXLIST",
2388 "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID", b"z"],
2390 super(IndexedAddModifyTests, self).setUp()
2392 def test_duplicate_GUID(self):
2394 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
2397 "objectUUID": b"0123456789abcdef"})
2398 self.fail("Should have failed adding duplicate GUID")
2399 except ldb.LdbError as err:
2401 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2403 def test_duplicate_name_dup_GUID(self):
2404 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2407 "objectUUID": b"a123456789abcdef"})
2409 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2412 "objectUUID": b"a123456789abcdef"})
2413 self.fail("Should have failed adding duplicate GUID")
2414 except ldb.LdbError as err:
2416 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2418 def test_duplicate_name_dup_GUID2(self):
2419 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2422 "objectUUID": b"abc3456789abcdef"})
2424 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2427 "objectUUID": b"aaa3456789abcdef"})
2428 self.fail("Should have failed adding duplicate DN")
2429 except ldb.LdbError as err:
2431 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2433 # Checking the GUID didn't stick in the index
2434 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2437 "objectUUID": b"aaa3456789abcdef"})
2439 def test_add_dup_guid_add(self):
2440 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2443 "objectUUID": b"0123456789abcde1"})
2445 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2448 "objectUUID": b"0123456789abcde1"})
2449 self.fail("Should have failed on duplicate GUID")
2451 except ldb.LdbError as err:
2453 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2455 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2458 "objectUUID": b"0123456789abcde2"})
2460 def test_duplicate_index_values(self):
2461 self.l.add({"dn": "OU=DIV1,DC=SAMBA,DC=ORG",
2464 "objectUUID": b"0123456789abcdff"})
2465 self.l.add({"dn": "OU=DIV2,DC=SAMBA,DC=ORG",
2468 "objectUUID": b"0123456789abcdfd"})
2471 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
2472 """Test searches using the index, to ensure the index doesn't
2476 self.index = {"dn": "@INDEXLIST",
2477 "@IDXATTR": [b"x", b"y", b"ou"],
2479 "@IDXGUID": [b"objectUUID"],
2480 "@IDX_DN_GUID": [b"GUID"]}
2481 super(GUIDIndexedAddModifyTests, self).setUp()
2484 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
2485 """Test GUID index behaviour insdie the transaction"""
2488 super(GUIDTransIndexedAddModifyTests, self).setUp()
2489 self.l.transaction_start()
2492 self.l.transaction_commit()
2493 super(GUIDTransIndexedAddModifyTests, self).tearDown()
2496 class TransIndexedAddModifyTests(IndexedAddModifyTests):
2497 """Test index behaviour insdie the transaction"""
2500 super(TransIndexedAddModifyTests, self).setUp()
2501 self.l.transaction_start()
2504 self.l.transaction_commit()
2505 super(TransIndexedAddModifyTests, self).tearDown()
2508 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
2511 if os.environ.get('HAVE_LMDB', '1') == '0':
2512 self.skipTest("No lmdb backend")
2513 self.prefix = MDB_PREFIX
2514 super(GuidIndexedAddModifyTestsLmdb, self).setUp()
2517 super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
2520 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
2523 if os.environ.get('HAVE_LMDB', '1') == '0':
2524 self.skipTest("No lmdb backend")
2525 self.prefix = MDB_PREFIX
2526 super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
2529 super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
2532 class BadIndexTests(LdbBaseTest):
2534 super(BadIndexTests, self).setUp()
2535 self.testdir = tempdir()
2536 self.filename = os.path.join(self.testdir, "test.ldb")
2537 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
2538 if hasattr(self, 'IDXGUID'):
2539 self.ldb.add({"dn": "@INDEXLIST",
2540 "@IDXATTR": [b"x", b"y", b"ou"],
2541 "@IDXGUID": [b"objectUUID"],
2542 "@IDX_DN_GUID": [b"GUID"]})
2544 self.ldb.add({"dn": "@INDEXLIST",
2545 "@IDXATTR": [b"x", b"y", b"ou"]})
2547 super(BadIndexTests, self).setUp()
2549 def test_unique(self):
2550 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2551 "objectUUID": b"0123456789abcde1",
2553 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2554 "objectUUID": b"0123456789abcde2",
2556 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2557 "objectUUID": b"0123456789abcde3",
2560 res = self.ldb.search(expression="(y=1)",
2561 base="dc=samba,dc=org")
2562 self.assertEqual(len(res), 3)
2564 # Now set this to unique index, but forget to check the result
2566 self.ldb.add({"dn": "@ATTRIBUTES",
2567 "y": "UNIQUE_INDEX"})
2569 except ldb.LdbError:
2572 # We must still have a working index
2573 res = self.ldb.search(expression="(y=1)",
2574 base="dc=samba,dc=org")
2575 self.assertEqual(len(res), 3)
2577 def test_unique_transaction(self):
2578 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2579 "objectUUID": b"0123456789abcde1",
2581 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2582 "objectUUID": b"0123456789abcde2",
2584 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2585 "objectUUID": b"0123456789abcde3",
2588 res = self.ldb.search(expression="(y=1)",
2589 base="dc=samba,dc=org")
2590 self.assertEqual(len(res), 3)
2592 self.ldb.transaction_start()
2594 # Now set this to unique index, but forget to check the result
2596 self.ldb.add({"dn": "@ATTRIBUTES",
2597 "y": "UNIQUE_INDEX"})
2598 except ldb.LdbError:
2602 self.ldb.transaction_commit()
2605 except ldb.LdbError as err:
2607 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2609 # We must still have a working index
2610 res = self.ldb.search(expression="(y=1)",
2611 base="dc=samba,dc=org")
2613 self.assertEqual(len(res), 3)
2615 def test_casefold(self):
2616 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2617 "objectUUID": b"0123456789abcde1",
2619 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2620 "objectUUID": b"0123456789abcde2",
2622 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2623 "objectUUID": b"0123456789abcde3",
2626 res = self.ldb.search(expression="(y=a)",
2627 base="dc=samba,dc=org")
2628 self.assertEqual(len(res), 2)
2630 self.ldb.add({"dn": "@ATTRIBUTES",
2631 "y": "CASE_INSENSITIVE"})
2633 # We must still have a working index
2634 res = self.ldb.search(expression="(y=a)",
2635 base="dc=samba,dc=org")
2637 if hasattr(self, 'IDXGUID'):
2638 self.assertEqual(len(res), 3)
2640 # We should not return this entry twice, but sadly
2641 # we have not yet fixed
2642 # https://bugzilla.samba.org/show_bug.cgi?id=13361
2643 self.assertEqual(len(res), 4)
2645 def test_casefold_transaction(self):
2646 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2647 "objectUUID": b"0123456789abcde1",
2649 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2650 "objectUUID": b"0123456789abcde2",
2652 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2653 "objectUUID": b"0123456789abcde3",
2656 res = self.ldb.search(expression="(y=a)",
2657 base="dc=samba,dc=org")
2658 self.assertEqual(len(res), 2)
2660 self.ldb.transaction_start()
2662 self.ldb.add({"dn": "@ATTRIBUTES",
2663 "y": "CASE_INSENSITIVE"})
2665 self.ldb.transaction_commit()
2667 # We must still have a working index
2668 res = self.ldb.search(expression="(y=a)",
2669 base="dc=samba,dc=org")
2671 if hasattr(self, 'IDXGUID'):
2672 self.assertEqual(len(res), 3)
2674 # We should not return this entry twice, but sadly
2675 # we have not yet fixed
2676 # https://bugzilla.samba.org/show_bug.cgi?id=13361
2677 self.assertEqual(len(res), 4)
2679 def test_modify_transaction(self):
2680 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2681 "objectUUID": b"0123456789abcde1",
2685 res = self.ldb.search(expression="(y=2)",
2686 base="dc=samba,dc=org")
2687 self.assertEqual(len(res), 1)
2689 self.ldb.add({"dn": "@ATTRIBUTES",
2690 "y": "UNIQUE_INDEX"})
2692 self.ldb.transaction_start()
2695 m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2696 m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2697 m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2703 except ldb.LdbError as err:
2705 self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2708 self.ldb.transaction_commit()
2709 # We should fail here, but we want to be sure
2712 except ldb.LdbError as err:
2714 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2716 # The index should still be pointing to x=y
2717 res = self.ldb.search(expression="(y=2)",
2718 base="dc=samba,dc=org")
2719 self.assertEqual(len(res), 1)
2722 self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
2723 "objectUUID": b"0123456789abcde2",
2725 self.fail("Added unique attribute twice")
2726 except ldb.LdbError as err:
2728 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2730 res = self.ldb.search(expression="(y=2)",
2731 base="dc=samba,dc=org")
2732 self.assertEqual(len(res), 1)
2733 self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
2736 super(BadIndexTests, self).tearDown()
2739 class GUIDBadIndexTests(BadIndexTests):
2740 """Test Bad index things with GUID index mode"""
2745 super(GUIDBadIndexTests, self).setUp()
2748 class GUIDBadIndexTestsLmdb(BadIndexTests):
2751 if os.environ.get('HAVE_LMDB', '1') == '0':
2752 self.skipTest("No lmdb backend")
2753 self.prefix = MDB_PREFIX
2754 self.index = MDB_INDEX_OBJ
2756 super(GUIDBadIndexTestsLmdb, self).setUp()
2759 super(GUIDBadIndexTestsLmdb, self).tearDown()
2762 class BatchModeTests(LdbBaseTest):
2765 super(BatchModeTests, self).setUp()
2766 self.testdir = tempdir()
2767 self.filename = os.path.join(self.testdir, "test.ldb")
2768 self.ldb = ldb.Ldb(self.url(),
2770 options=["batch_mode:1"])
2771 if hasattr(self, 'IDXGUID'):
2772 self.ldb.add({"dn": "@INDEXLIST",
2773 "@IDXATTR": [b"x", b"y", b"ou"],
2774 "@IDXGUID": [b"objectUUID"],
2775 "@IDX_DN_GUID": [b"GUID"]})
2777 self.ldb.add({"dn": "@INDEXLIST",
2778 "@IDXATTR": [b"x", b"y", b"ou"]})
2780 def test_modify_transaction(self):
2781 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2782 "objectUUID": b"0123456789abcde1",
2786 res = self.ldb.search(expression="(y=2)",
2787 base="dc=samba,dc=org")
2788 self.assertEqual(len(res), 1)
2790 self.ldb.add({"dn": "@ATTRIBUTES",
2791 "y": "UNIQUE_INDEX"})
2793 self.ldb.transaction_start()
2796 m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2797 m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2798 m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2804 except ldb.LdbError as err:
2806 self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2809 self.ldb.transaction_commit()
2810 self.fail("Commit should have failed as we were in batch mode")
2811 except ldb.LdbError as err:
2813 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2816 super(BatchModeTests, self).tearDown()
2819 class DnTests(TestCase):
2822 super(DnTests, self).setUp()
2823 self.ldb = ldb.Ldb()
2826 super(DnTests, self).tearDown()
2829 def test_set_dn_invalid(self):
2834 self.assertRaises(TypeError, assign)
2837 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2838 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2839 self.assertEqual(x, y)
2840 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2841 self.assertNotEqual(x, y)
2844 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2845 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2847 def test_repr(self):
2848 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2849 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2851 def test_get_casefold_2(self):
2852 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2853 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2855 def test_validate(self):
2856 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2857 self.assertTrue(x.validate())
2859 def test_parent(self):
2860 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2861 self.assertEqual("bar=bloe", x.parent().__str__())
2863 def test_parent_nonexistent(self):
2864 x = ldb.Dn(self.ldb, "@BLA")
2865 self.assertEqual(None, x.parent())
2867 def test_is_valid(self):
2868 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2869 self.assertTrue(x.is_valid())
2870 x = ldb.Dn(self.ldb, "")
2871 self.assertTrue(x.is_valid())
2873 def test_is_special(self):
2874 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2875 self.assertFalse(x.is_special())
2876 x = ldb.Dn(self.ldb, "@FOOBAR")
2877 self.assertTrue(x.is_special())
2879 def test_check_special(self):
2880 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2881 self.assertFalse(x.check_special("FOOBAR"))
2882 x = ldb.Dn(self.ldb, "@FOOBAR")
2883 self.assertTrue(x.check_special("@FOOBAR"))
2886 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2887 self.assertEqual(2, len(x))
2888 x = ldb.Dn(self.ldb, "dc=foo21")
2889 self.assertEqual(1, len(x))
2891 def test_add_child(self):
2892 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2893 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2894 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2896 def test_add_base(self):
2897 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2898 base = ldb.Dn(self.ldb, "bla=bloe")
2899 self.assertTrue(x.add_base(base))
2900 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2902 def test_add_child_str(self):
2903 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2904 self.assertTrue(x.add_child("bla=bloe"))
2905 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2907 def test_add_base_str(self):
2908 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2910 self.assertTrue(x.add_base(base))
2911 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2914 x = ldb.Dn(self.ldb, "dc=foo24")
2915 y = ldb.Dn(self.ldb, "bar=bla")
2916 self.assertEqual("dc=foo24,bar=bla", str(x + y))
2918 def test_remove_base_components(self):
2919 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2920 x.remove_base_components(len(x) - 1)
2921 self.assertEqual("dc=foo24", str(x))
2923 def test_parse_ldif(self):
2924 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2926 self.assertEqual("foo=bar", str(msg[1].dn))
2927 self.assertTrue(isinstance(msg[1], ldb.Message))
2928 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2929 self.assertEqual("dn: foo=bar\n\n", ldif)
2931 def test_parse_ldif_more(self):
2932 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2934 self.assertEqual("foo=bar", str(msg[1].dn))
2936 self.assertEqual("bar=bar", str(msg[1].dn))
2938 def test_print_ldif(self):
2939 ldif = '''dn: dc=foo27
2943 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2944 self.msg["foo"] = [b"foo"]
2945 self.assertEqual(ldif,
2946 self.ldb.write_ldif(self.msg,
2947 ldb.CHANGETYPE_NONE))
2949 def test_print_ldif_binary(self):
2950 # this also confirms that ldb flags are set even without a URL)
2951 self.ldb = ldb.Ldb(flags=ldb.FLG_SHOW_BINARY)
2952 ldif = '''dn: dc=foo27
2957 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2958 self.msg["foo"] = ["f\nöö"]
2959 self.assertEqual(ldif,
2960 self.ldb.write_ldif(self.msg,
2961 ldb.CHANGETYPE_NONE))
2964 def test_print_ldif_no_base64_bad(self):
2965 ldif = '''dn: dc=foo27
2970 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2971 self.msg["foo"] = ["f\nöö"]
2972 self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2973 self.assertEqual(ldif,
2974 self.ldb.write_ldif(self.msg,
2975 ldb.CHANGETYPE_NONE))
2977 def test_print_ldif_no_base64_good(self):
2978 ldif = '''dn: dc=foo27
2982 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2983 self.msg["foo"] = ["föö"]
2984 self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2985 self.assertEqual(ldif,
2986 self.ldb.write_ldif(self.msg,
2987 ldb.CHANGETYPE_NONE))
2989 def test_canonical_string(self):
2990 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2991 self.assertEqual("/bloe/foo25", x.canonical_str())
2993 def test_canonical_ex_string(self):
2994 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2995 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2997 def test_ldb_is_child_of(self):
2998 """Testing ldb_dn_compare_dn"""
2999 dn1 = ldb.Dn(self.ldb, "dc=base")
3000 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
3001 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
3002 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
3004 self.assertTrue(dn1.is_child_of(dn1))
3005 self.assertTrue(dn2.is_child_of(dn1))
3006 self.assertTrue(dn4.is_child_of(dn1))
3007 self.assertTrue(dn4.is_child_of(dn3))
3008 self.assertTrue(dn4.is_child_of(dn4))
3009 self.assertFalse(dn3.is_child_of(dn2))
3010 self.assertFalse(dn1.is_child_of(dn4))
3012 def test_ldb_is_child_of_str(self):
3013 """Testing ldb_dn_compare_dn"""
3015 dn2_str = "cn=foo,dc=base"
3016 dn3_str = "cn=bar,dc=base"
3017 dn4_str = "cn=baz,cn=bar,dc=base"
3019 dn1 = ldb.Dn(self.ldb, dn1_str)
3020 dn2 = ldb.Dn(self.ldb, dn2_str)
3021 dn3 = ldb.Dn(self.ldb, dn3_str)
3022 dn4 = ldb.Dn(self.ldb, dn4_str)
3024 self.assertTrue(dn1.is_child_of(dn1_str))
3025 self.assertTrue(dn2.is_child_of(dn1_str))
3026 self.assertTrue(dn4.is_child_of(dn1_str))
3027 self.assertTrue(dn4.is_child_of(dn3_str))
3028 self.assertTrue(dn4.is_child_of(dn4_str))
3029 self.assertFalse(dn3.is_child_of(dn2_str))
3030 self.assertFalse(dn1.is_child_of(dn4_str))
3032 def test_get_component_name(self):
3033 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3034 self.assertEqual(dn.get_component_name(0), 'cn')
3035 self.assertEqual(dn.get_component_name(1), 'dc')
3036 self.assertEqual(dn.get_component_name(2), None)
3037 self.assertEqual(dn.get_component_name(-1), None)
3039 def test_get_component_value(self):
3040 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3041 self.assertEqual(dn.get_component_value(0), 'foo')
3042 self.assertEqual(dn.get_component_value(1), 'base')
3043 self.assertEqual(dn.get_component_name(2), None)
3044 self.assertEqual(dn.get_component_name(-1), None)
3046 def test_set_component(self):
3047 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3048 dn.set_component(0, 'cn', 'bar')
3049 self.assertEqual(str(dn), "cn=bar,dc=base")
3050 dn.set_component(1, 'o', 'asep')
3051 self.assertEqual(str(dn), "cn=bar,o=asep")
3052 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
3053 self.assertEqual(str(dn), "cn=bar,o=asep")
3054 dn.set_component(1, 'o', 'a,b+c')
3055 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
3057 def test_set_component_bytes(self):
3058 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3059 dn.set_component(0, 'cn', b'bar')
3060 self.assertEqual(str(dn), "cn=bar,dc=base")
3061 dn.set_component(1, 'o', b'asep')
3062 self.assertEqual(str(dn), "cn=bar,o=asep")
3064 def test_set_component_none(self):
3065 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
3066 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
3068 def test_get_extended_component_null(self):
3069 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
3070 self.assertEqual(dn.get_extended_component("TEST"), None)
3072 def test_get_extended_component(self):
3073 self.ldb._register_test_extensions()
3074 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
3075 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
3077 def test_set_extended_component(self):
3078 self.ldb._register_test_extensions()
3079 dn = ldb.Dn(self.ldb, "dc=base")
3080 dn.set_extended_component("TEST", "foo")
3081 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
3082 dn.set_extended_component("TEST", b"bar")
3083 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
3085 def test_extended_str(self):
3086 self.ldb._register_test_extensions()
3087 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
3088 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
3090 def test_get_rdn_name(self):
3091 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3092 self.assertEqual(dn.get_rdn_name(), 'cn')
3094 def test_get_rdn_value(self):
3095 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3096 self.assertEqual(dn.get_rdn_value(), 'foo')
3098 def test_get_casefold(self):
3099 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3100 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
3102 def test_get_linearized(self):
3103 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3104 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
3106 def test_is_null(self):
3107 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3108 self.assertFalse(dn.is_null())
3110 dn = ldb.Dn(self.ldb, '')
3111 self.assertTrue(dn.is_null())
3114 class LdbMsgTests(TestCase):
3117 super(LdbMsgTests, self).setUp()
3118 self.msg = ldb.Message()
3120 def test_init_dn(self):
3121 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
3122 self.assertEqual("dc=foo27", str(self.msg.dn))
3124 def test_iter_items(self):
3125 self.assertEqual(0, len(self.msg.items()))
3126 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
3127 self.assertEqual(1, len(self.msg.items()))
3129 def test_items(self):
3130 self.msg["foo"] = ["foo"]
3131 self.msg["bar"] = ["bar"]
3133 items = self.msg.items()
3136 self.assertEqual([("foo", ldb.MessageElement(["foo"])),
3137 ("bar", ldb.MessageElement(["bar"]))],
3140 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=test")
3142 items = self.msg.items()
3145 self.assertEqual([("dn", ldb.Dn(ldb.Ldb(), "dc=test")),
3146 ("foo", ldb.MessageElement(["foo"])),
3147 ("bar", ldb.MessageElement(["bar"]))],
3150 def test_repr(self):
3151 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
3152 self.msg["dc"] = b"foo"
3153 self.assertIn(repr(self.msg), [
3154 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
3155 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
3157 self.assertIn(repr(self.msg.text), [
3158 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
3159 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
3163 self.assertEqual(0, len(self.msg))
3165 def test_notpresent(self):
3166 self.assertRaises(KeyError, lambda: self.msg["foo"])
3168 def test_invalid(self):
3170 self.assertRaises(TypeError, lambda: self.msg[42])
3178 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
3180 def test_add_text(self):
3181 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
3183 def test_elements_empty(self):
3184 self.assertEqual([], self.msg.elements())
3186 def test_elements(self):
3187 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3189 self.assertEqual([el], self.msg.elements())
3190 self.assertEqual([el.text], self.msg.text.elements())
3192 def test_add_value(self):
3193 self.assertEqual(0, len(self.msg))
3194 self.msg["foo"] = [b"foo"]
3195 self.assertEqual(1, len(self.msg))
3197 def test_add_value_text(self):
3198 self.assertEqual(0, len(self.msg))
3199 self.msg["foo"] = ["foo"]
3200 self.assertEqual(1, len(self.msg))
3202 def test_add_value_multiple(self):
3203 self.assertEqual(0, len(self.msg))
3204 self.msg["foo"] = [b"foo", b"bla"]
3205 self.assertEqual(1, len(self.msg))
3206 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
3208 def test_add_value_multiple_text(self):
3209 self.assertEqual(0, len(self.msg))
3210 self.msg["foo"] = ["foo", "bla"]
3211 self.assertEqual(1, len(self.msg))
3212 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
3214 def test_set_value(self):
3215 self.msg["foo"] = [b"fool"]
3216 self.assertEqual([b"fool"], list(self.msg["foo"]))
3217 self.msg["foo"] = [b"bar"]
3218 self.assertEqual([b"bar"], list(self.msg["foo"]))
3220 def test_set_value_text(self):
3221 self.msg["foo"] = ["fool"]
3222 self.assertEqual(["fool"], list(self.msg.text["foo"]))
3223 self.msg["foo"] = ["bar"]
3224 self.assertEqual(["bar"], list(self.msg.text["foo"]))
3226 def test_keys(self):
3227 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3228 self.msg["foo"] = [b"bla"]
3229 self.msg["bar"] = [b"bla"]
3230 self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
3232 def test_keys_text(self):
3233 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3234 self.msg["foo"] = ["bla"]
3235 self.msg["bar"] = ["bla"]
3236 self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
3239 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3240 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
3242 def test_get_dn(self):
3243 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3244 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
3246 def test_dn_text(self):
3247 self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3248 self.assertEqual("@BASEINFO", str(self.msg.dn))
3249 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
3251 def test_get_dn_text(self):
3252 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3253 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
3254 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
3256 def test_get_invalid(self):
3257 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3258 self.assertRaises(TypeError, self.msg.get, 42)
3260 def test_get_other(self):
3261 self.msg["foo"] = [b"bar"]
3262 self.assertEqual(b"bar", self.msg.get("foo")[0])
3263 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
3264 self.assertEqual(None, self.msg.get("foo", idx=1))
3265 self.assertEqual("", self.msg.get("foo", default='', idx=1))
3267 def test_get_other_text(self):
3268 self.msg["foo"] = ["bar"]
3269 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
3270 self.assertEqual("bar", self.msg.text.get("foo")[0])
3271 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
3272 self.assertEqual(None, self.msg.get("foo", idx=1))
3273 self.assertEqual("", self.msg.get("foo", default='', idx=1))
3275 def test_get_default(self):
3276 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
3277 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
3279 def test_get_default_text(self):
3280 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
3281 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
3283 def test_get_unknown(self):
3284 self.assertEqual(None, self.msg.get("lalalala"))
3286 def test_get_unknown_text(self):
3287 self.assertEqual(None, self.msg.text.get("lalalala"))
3289 def test_contains(self):
3290 self.msg['foo'] = ['bar']
3291 self.assertIn('foo', self.msg)
3293 self.msg['Foo'] = ['bar']
3294 self.assertIn('Foo', self.msg)
3296 def test_contains_case(self):
3297 self.msg['foo'] = ['bar']
3298 self.assertIn('Foo', self.msg)
3300 self.msg['Foo'] = ['bar']
3301 self.assertIn('foo', self.msg)
3303 def test_contains_dn(self):
3304 self.assertIn('dn', self.msg)
3306 def test_contains_dn_case(self):
3307 self.assertIn('DN', self.msg)
3309 def test_contains_invalid(self):
3310 self.assertRaises(TypeError, lambda: None in self.msg)
3312 def test_msg_diff(self):
3314 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
3315 msg1 = next(msgs)[1]
3316 msg2 = next(msgs)[1]
3317 msgdiff = l.msg_diff(msg1, msg2)
3318 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
3319 self.assertRaises(KeyError, lambda: msgdiff["foo"])
3320 self.assertEqual(1, len(msgdiff))
3322 def test_equal_empty(self):
3323 msg1 = ldb.Message()
3324 msg2 = ldb.Message()
3325 self.assertEqual(msg1, msg2)
3327 def test_equal_simplel(self):
3329 msg1 = ldb.Message()
3330 msg1.dn = ldb.Dn(db, "foo=bar")
3331 msg2 = ldb.Message()
3332 msg2.dn = ldb.Dn(db, "foo=bar")
3333 self.assertEqual(msg1, msg2)
3334 msg1['foo'] = b'bar'
3335 msg2['foo'] = b'bar'
3336 self.assertEqual(msg1, msg2)
3337 msg2['foo'] = b'blie'
3338 self.assertNotEqual(msg1, msg2)
3339 msg2['foo'] = b'blie'
3341 def test_from_dict(self):
3342 rec = {"dn": "dc=fromdict",
3343 "a1": [b"a1-val1", b"a1-val1"]}
3345 # check different types of input Flags
3346 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3347 m = ldb.Message.from_dict(l, rec, flags)
3348 self.assertEqual(rec["a1"], list(m["a1"]))
3349 self.assertEqual(flags, m["a1"].flags())
3350 # check input params
3351 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3352 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3353 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3354 # Message.from_dict expects dictionary with 'dn'
3355 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
3356 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3358 def test_from_dict_text(self):
3359 rec = {"dn": "dc=fromdict",
3360 "a1": ["a1-val1", "a1-val1"]}
3362 # check different types of input Flags
3363 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3364 m = ldb.Message.from_dict(l, rec, flags)
3365 self.assertEqual(rec["a1"], list(m.text["a1"]))
3366 self.assertEqual(flags, m.text["a1"].flags())
3367 # check input params
3368 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3369 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3370 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3371 # Message.from_dict expects dictionary with 'dn'
3372 err_rec = {"a1": ["a1-val1", "a1-val1"]}
3373 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3375 def test_copy_add_message_element(self):
3377 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
3378 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
3382 self.assertEqual(mto["1"], m["1"])
3383 self.assertEqual(mto["2"], m["2"])
3387 self.assertEqual(mto["1"], m["1"])
3388 self.assertEqual(mto["2"], m["2"])
3390 def test_copy_add_message_element_text(self):
3392 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
3393 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
3397 self.assertEqual(mto["1"], m.text["1"])
3398 self.assertEqual(mto["2"], m.text["2"])
3402 self.assertEqual(mto.text["1"], m.text["1"])
3403 self.assertEqual(mto.text["2"], m.text["2"])
3404 self.assertEqual(mto["1"], m["1"])
3405 self.assertEqual(mto["2"], m["2"])
3408 class MessageElementTests(TestCase):
3410 def test_cmp_element(self):
3411 x = ldb.MessageElement([b"foo"])
3412 y = ldb.MessageElement([b"foo"])
3413 z = ldb.MessageElement([b"bzr"])
3414 self.assertEqual(x, y)
3415 self.assertNotEqual(x, z)
3417 def test_cmp_element_text(self):
3418 x = ldb.MessageElement([b"foo"])
3419 y = ldb.MessageElement(["foo"])
3420 self.assertEqual(x, y)
3422 def test_create_iterable(self):
3423 x = ldb.MessageElement([b"foo"])
3424 self.assertEqual([b"foo"], list(x))
3425 self.assertEqual(["foo"], list(x.text))
3427 def test_repr(self):
3428 x = ldb.MessageElement([b"foo"])
3429 self.assertEqual("MessageElement([b'foo'])", repr(x))
3430 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
3431 x = ldb.MessageElement([b"foo", b"bla"])
3432 self.assertEqual(2, len(x))
3433 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
3434 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
3436 def test_get_item(self):
3437 x = ldb.MessageElement([b"foo", b"bar"])
3438 self.assertEqual(b"foo", x[0])
3439 self.assertEqual(b"bar", x[1])
3440 self.assertEqual(b"bar", x[-1])
3441 self.assertRaises(IndexError, lambda: x[45])
3443 def test_get_item_text(self):
3444 x = ldb.MessageElement(["foo", "bar"])
3445 self.assertEqual("foo", x.text[0])
3446 self.assertEqual("bar", x.text[1])
3447 self.assertEqual("bar", x.text[-1])
3448 self.assertRaises(IndexError, lambda: x[45])
3451 x = ldb.MessageElement([b"foo", b"bar"])
3452 self.assertEqual(2, len(x))
3455 x = ldb.MessageElement([b"foo", b"bar"])
3456 y = ldb.MessageElement([b"foo", b"bar"])
3457 self.assertEqual(y, x)
3458 x = ldb.MessageElement([b"foo"])
3459 self.assertNotEqual(y, x)
3460 y = ldb.MessageElement([b"foo"])
3461 self.assertEqual(y, x)
3463 def test_extended(self):
3464 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3465 self.assertEqual("MessageElement([b'456'])", repr(el))
3466 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
3468 def test_bad_text(self):
3469 el = ldb.MessageElement(b'\xba\xdd')
3470 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
3473 class LdbResultTests(LdbBaseTest):
3476 super(LdbResultTests, self).setUp()
3477 self.testdir = tempdir()
3478 self.filename = os.path.join(self.testdir, "test.ldb")
3479 self.l = ldb.Ldb(self.url(), flags=self.flags())
3481 self.l.add(self.index)
3482 except AttributeError:
3484 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
3485 "objectUUID": b"0123456789abcde0"})
3486 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
3487 "objectUUID": b"0123456789abcde1"})
3488 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
3489 "objectUUID": b"0123456789abcde2"})
3490 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
3491 "objectUUID": b"0123456789abcde3"})
3492 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
3493 "objectUUID": b"0123456789abcde4"})
3494 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
3495 "objectUUID": b"0123456789abcde5"})
3496 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
3497 "objectUUID": b"0123456789abcde6"})
3498 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
3499 "objectUUID": b"0123456789abcde7"})
3500 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
3501 "objectUUID": b"0123456789abcde8"})
3502 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
3503 "objectUUID": b"0123456789abcde9"})
3504 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
3505 "objectUUID": b"0123456789abcdea"})
3506 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
3507 "objectUUID": b"0123456789abcdeb"})
3508 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
3509 "objectUUID": b"0123456789abcdec"})
3512 shutil.rmtree(self.testdir)
3513 super(LdbResultTests, self).tearDown()
3514 # Ensure the LDB is closed now, so we close the FD
3517 def test_return_type(self):
3518 res = self.l.search()
3519 self.assertEqual(str(res), "<ldb result>")
3521 def test_get_msgs(self):
3522 res = self.l.search()
3525 def test_get_controls(self):
3526 res = self.l.search()
3529 def test_get_referals(self):
3530 res = self.l.search()
3533 def test_iter_msgs(self):
3535 for l in self.l.search().msgs:
3536 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3538 self.assertTrue(found)
3540 def test_iter_msgs_count(self):
3541 self.assertTrue(self.l.search().count > 0)
3542 # 13 objects has been added to the DC=SAMBA, DC=ORG
3543 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
3545 def test_iter_controls(self):
3546 res = self.l.search().controls
3549 def test_create_control(self):
3550 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
3551 c = ldb.Control(self.l, "relax:1")
3552 self.assertEqual(c.critical, True)
3553 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
3555 def test_iter_refs(self):
3556 res = self.l.search().referals
3559 def test_search_sequence_msgs(self):
3561 res = self.l.search().msgs
3563 for i in range(0, len(res)):
3565 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3567 self.assertTrue(found)
3569 def test_search_as_iter(self):
3571 res = self.l.search()
3574 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3576 self.assertTrue(found)
3578 def test_search_iter(self):
3580 res = self.l.search_iterator()
3583 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3585 self.assertTrue(found)
3587 # Show that search results can't see into a transaction
3589 def test_search_against_trans(self):
3592 (r1, w1) = os.pipe()
3594 (r2, w2) = os.pipe()
3596 # For the first element, fork a child that will
3600 # In the child, re-open
3604 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3605 # start a transaction
3606 child_ldb.transaction_start()
3609 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3610 "name": b"samba.org",
3611 "objectUUID": b"o123456789acbdef"})
3613 os.write(w1, b"added")
3615 # Now wait for the search to be done
3620 child_ldb.transaction_commit()
3621 except ldb.LdbError as err:
3622 # We print this here to see what went wrong in the child
3626 os.write(w1, b"transaction")
3629 self.assertEqual(os.read(r1, 5), b"added")
3631 # This should not turn up until the transaction is concluded
3632 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3633 scope=ldb.SCOPE_BASE)
3634 self.assertEqual(len(res11), 0)
3636 os.write(w2, b"search")
3638 # Now wait for the transaction to be done. This should
3639 # deadlock, but the search doesn't hold a read lock for the
3640 # iterator lifetime currently.
3641 self.assertEqual(os.read(r1, 11), b"transaction")
3643 # This should now turn up, as the transaction is over
3644 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3645 scope=ldb.SCOPE_BASE)
3646 self.assertEqual(len(res11), 1)
3648 self.assertFalse(found11)
3650 (got_pid, status) = os.waitpid(pid, 0)
3651 self.assertEqual(got_pid, pid)
3653 def test_search_iter_against_trans(self):
3657 # We need to hold this iterator open to hold the all-record
3659 res = self.l.search_iterator()
3661 (r1, w1) = os.pipe()
3663 (r2, w2) = os.pipe()
3665 # For the first element, with the sequence open (which
3666 # means with ldb locks held), fork a child that will
3670 # In the child, re-open
3675 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3676 # start a transaction
3677 child_ldb.transaction_start()
3680 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3681 "name": b"samba.org",
3682 "objectUUID": b"o123456789acbdef"})
3684 os.write(w1, b"added")
3686 # Now wait for the search to be done
3691 child_ldb.transaction_commit()
3692 except ldb.LdbError as err:
3693 # We print this here to see what went wrong in the child
3697 os.write(w1, b"transaction")
3700 self.assertEqual(os.read(r1, 5), b"added")
3702 # This should not turn up until the transaction is concluded
3703 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3704 scope=ldb.SCOPE_BASE)
3705 self.assertEqual(len(res11), 0)
3707 os.write(w2, b"search")
3709 # allow the transaction to start
3712 # This should not turn up until the search finishes and
3713 # removed the read lock, but for ldb_tdb that happened as soon
3714 # as we called the first res.next()
3715 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3716 scope=ldb.SCOPE_BASE)
3717 self.assertEqual(len(res11), 0)
3719 # These results are all collected at the first next(res) call
3721 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3723 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
3726 # Now wait for the transaction to be done.
3727 self.assertEqual(os.read(r1, 11), b"transaction")
3729 # This should now turn up, as the transaction is over and all
3730 # read locks are gone
3731 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3732 scope=ldb.SCOPE_BASE)
3733 self.assertEqual(len(res11), 1)
3735 self.assertTrue(found)
3736 self.assertFalse(found11)
3738 (got_pid, status) = os.waitpid(pid, 0)
3739 self.assertEqual(got_pid, pid)
3742 class LdbResultTestsLmdb(LdbResultTests):
3745 if os.environ.get('HAVE_LMDB', '1') == '0':
3746 self.skipTest("No lmdb backend")
3747 self.prefix = MDB_PREFIX
3748 self.index = MDB_INDEX_OBJ
3749 super(LdbResultTestsLmdb, self).setUp()
3752 super(LdbResultTestsLmdb, self).tearDown()
3755 class BadTypeTests(TestCase):
3756 def test_control(self):
3758 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
3759 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
3761 def test_modify(self):
3763 dn = ldb.Dn(l, 'a=b')
3765 self.assertRaises(TypeError, l.modify, '<bad type>')
3766 self.assertRaises(TypeError, l.modify, m, '<bad type>')
3770 dn = ldb.Dn(l, 'a=b')
3772 self.assertRaises(TypeError, l.add, '<bad type>')
3773 self.assertRaises(TypeError, l.add, m, '<bad type>')
3775 def test_delete(self):
3777 dn = ldb.Dn(l, 'a=b')
3778 self.assertRaises(TypeError, l.add, '<bad type>')
3779 self.assertRaises(TypeError, l.add, dn, '<bad type>')
3781 def test_rename(self):
3783 dn = ldb.Dn(l, 'a=b')
3784 self.assertRaises(TypeError, l.add, '<bad type>', dn)
3785 self.assertRaises(TypeError, l.add, dn, '<bad type>')
3786 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
3788 def test_search(self):
3790 self.assertRaises(TypeError, l.search, base=1234)
3791 self.assertRaises(TypeError, l.search, scope='<bad type>')
3792 self.assertRaises(TypeError, l.search, expression=1234)
3793 self.assertRaises(TypeError, l.search, attrs='<bad type>')
3794 self.assertRaises(TypeError, l.search, controls='<bad type>')
3797 class VersionTests(TestCase):
3799 def test_version(self):
3800 self.assertTrue(isinstance(ldb.__version__, str))
3802 class NestedTransactionTests(LdbBaseTest):
3804 super(NestedTransactionTests, self).setUp()
3805 self.testdir = tempdir()
3806 self.filename = os.path.join(self.testdir, "test.ldb")
3807 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
3808 self.ldb.add({"dn": "@INDEXLIST",
3809 "@IDXATTR": [b"x", b"y", b"ou"],
3810 "@IDXGUID": [b"objectUUID"],
3811 "@IDX_DN_GUID": [b"GUID"]})
3813 super(NestedTransactionTests, self).setUp()
3816 # This test documents that currently ldb does not support true nested
3819 # Note: The test is written so that it treats failure as pass.
3820 # It is done this way as standalone ldb builds do not use the samba
3821 # known fail mechanism
3823 def test_nested_transactions(self):
3825 self.ldb.transaction_start()
3827 self.ldb.add({"dn": "x=x1,dc=samba,dc=org",
3828 "objectUUID": b"0123456789abcde1"})
3829 res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3830 base="dc=samba,dc=org")
3831 self.assertEqual(len(res), 1)
3833 self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
3834 "objectUUID": b"0123456789abcde2"})
3835 res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3836 base="dc=samba,dc=org")
3837 self.assertEqual(len(res), 1)
3839 self.ldb.transaction_start()
3840 self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
3841 "objectUUID": b"0123456789abcde3"})
3842 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3843 base="dc=samba,dc=org")
3844 self.assertEqual(len(res), 1)
3845 self.ldb.transaction_cancel()
3847 # Check that we can not see the record added by the cancelled
3849 # Currently this fails as ldb does not support true nested
3850 # transactions, and only the outer commits and cancels have an effect
3852 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3853 base="dc=samba,dc=org")
3855 # FIXME this test currently passes on a failure, i.e. if nested
3856 # transaction support worked correctly the correct test would
3858 # self.assertEqual(len(res), 0)
3859 # as the add of objectUUID=0123456789abcde3 would reverted when
3860 # the sub transaction it was nested in was rolled back.
3862 # Currently this is not the case so the record is still present.
3863 self.assertEqual(len(res), 1)
3866 # Commit the outer transaction
3868 self.ldb.transaction_commit()
3870 # Now check we can still see the records added in the outer
3873 res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3874 base="dc=samba,dc=org")
3875 self.assertEqual(len(res), 1)
3876 res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3877 base="dc=samba,dc=org")
3878 self.assertEqual(len(res), 1)
3880 # And that we can't see the records added by the nested transaction.
3882 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3883 base="dc=samba,dc=org")
3884 # FIXME again if nested transactions worked correctly we would not
3885 # see this record. The test should be.
3886 # self.assertEqual(len(res), 0)
3887 self.assertEqual(len(res), 1)
3890 super(NestedTransactionTests, self).tearDown()
3893 class LmdbNestedTransactionTests(NestedTransactionTests):
3896 if os.environ.get('HAVE_LMDB', '1') == '0':
3897 self.skipTest("No lmdb backend")
3898 self.prefix = MDB_PREFIX
3899 self.index = MDB_INDEX_OBJ
3900 super(LmdbNestedTransactionTests, self).setUp()
3903 super(LmdbNestedTransactionTests, self).tearDown()
3906 if __name__ == '__main__':
3908 unittest.TestProgram()