2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
6 from unittest import TestCase
13 PY3 = sys.version_info > (3, 0)
21 "@IDXGUID": [b"objectUUID"],
22 "@IDX_DN_GUID": [b"GUID"]
29 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
32 return tempfile.mkdtemp(dir=dir_prefix)
35 class NoContextTests(TestCase):
37 def test_valid_attr_name(self):
38 self.assertTrue(ldb.valid_attr_name("foo"))
39 self.assertFalse(ldb.valid_attr_name("24foo"))
41 def test_timestring(self):
42 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
43 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
45 def test_string_to_time(self):
46 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
47 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
49 def test_binary_encode(self):
50 encoded = ldb.binary_encode(b'test\\x')
51 decoded = ldb.binary_decode(encoded)
52 self.assertEqual(decoded, b'test\\x')
54 encoded2 = ldb.binary_encode('test\\x')
55 self.assertEqual(encoded2, encoded)
58 class LdbBaseTest(TestCase):
60 super(LdbBaseTest, self).setUp()
62 if self.prefix is None:
63 self.prefix = TDB_PREFIX
64 except AttributeError:
65 self.prefix = TDB_PREFIX
68 super(LdbBaseTest, self).tearDown()
71 return self.prefix + self.filename
74 if self.prefix == MDB_PREFIX:
80 class SimpleLdb(LdbBaseTest):
83 super(SimpleLdb, self).setUp()
84 self.testdir = tempdir()
85 self.filename = os.path.join(self.testdir, "test.ldb")
86 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
88 self.ldb.add(self.index)
89 except AttributeError:
93 shutil.rmtree(self.testdir)
94 super(SimpleLdb, self).tearDown()
95 # Ensure the LDB is closed now, so we close the FD
98 def test_connect(self):
99 ldb.Ldb(self.url(), flags=self.flags())
101 def test_connect_none(self):
104 def test_connect_later(self):
106 x.connect(self.url(), flags=self.flags())
110 self.assertTrue(repr(x).startswith("<ldb connection"))
112 def test_set_create_perms(self):
114 x.set_create_perms(0o600)
116 def test_modules_none(self):
118 self.assertEqual([], x.modules())
120 def test_modules_tdb(self):
121 x = ldb.Ldb(self.url(), flags=self.flags())
122 self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
124 def test_firstmodule_none(self):
126 self.assertEqual(x.firstmodule, None)
128 def test_firstmodule_tdb(self):
129 x = ldb.Ldb(self.url(), flags=self.flags())
131 self.assertEqual(repr(mod), "<ldb module 'tdb'>")
133 def test_search(self):
134 l = ldb.Ldb(self.url(), flags=self.flags())
135 self.assertEqual(len(l.search()), 0)
137 def test_search_controls(self):
138 l = ldb.Ldb(self.url(), flags=self.flags())
139 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
141 def test_utf8_ldb_Dn(self):
142 l = ldb.Ldb(self.url(), flags=self.flags())
143 dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
145 def test_utf8_encoded_ldb_Dn(self):
146 l = ldb.Ldb(self.url(), flags=self.flags())
147 dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
149 dn = ldb.Dn(l, dn_encoded_utf8)
150 except UnicodeDecodeError as e:
152 except TypeError as te:
154 p3errors = ["argument 2 must be str, not bytes",
155 "Can't convert 'bytes' object to str implicitly"]
156 self.assertIn(str(te), p3errors)
160 def test_search_attrs(self):
161 l = ldb.Ldb(self.url(), flags=self.flags())
162 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
164 def test_search_string_dn(self):
165 l = ldb.Ldb(self.url(), flags=self.flags())
166 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
168 def test_search_attr_string(self):
169 l = ldb.Ldb(self.url(), flags=self.flags())
170 self.assertRaises(TypeError, l.search, attrs="dc")
171 self.assertRaises(TypeError, l.search, attrs=b"dc")
173 def test_opaque(self):
174 l = ldb.Ldb(self.url(), flags=self.flags())
175 l.set_opaque("my_opaque", l)
176 self.assertTrue(l.get_opaque("my_opaque") is not None)
177 self.assertEqual(None, l.get_opaque("unknown"))
179 def test_search_scope_base_empty_db(self):
180 l = ldb.Ldb(self.url(), flags=self.flags())
181 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
184 def test_search_scope_onelevel_empty_db(self):
185 l = ldb.Ldb(self.url(), flags=self.flags())
186 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
187 ldb.SCOPE_ONELEVEL)), 0)
189 def test_delete(self):
190 l = ldb.Ldb(self.url(), flags=self.flags())
191 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
193 def test_delete_w_unhandled_ctrl(self):
194 l = ldb.Ldb(self.url(), flags=self.flags())
196 m.dn = ldb.Dn(l, "dc=foo1")
198 m["objectUUID"] = b"0123456789abcdef"
200 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
203 def test_contains(self):
205 l = ldb.Ldb(name, flags=self.flags())
206 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
207 l = ldb.Ldb(name, flags=self.flags())
209 m.dn = ldb.Dn(l, "dc=foo3")
211 m["objectUUID"] = b"0123456789abcdef"
214 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
215 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
219 def test_get_config_basedn(self):
220 l = ldb.Ldb(self.url(), flags=self.flags())
221 self.assertEqual(None, l.get_config_basedn())
223 def test_get_root_basedn(self):
224 l = ldb.Ldb(self.url(), flags=self.flags())
225 self.assertEqual(None, l.get_root_basedn())
227 def test_get_schema_basedn(self):
228 l = ldb.Ldb(self.url(), flags=self.flags())
229 self.assertEqual(None, l.get_schema_basedn())
231 def test_get_default_basedn(self):
232 l = ldb.Ldb(self.url(), flags=self.flags())
233 self.assertEqual(None, l.get_default_basedn())
236 l = ldb.Ldb(self.url(), flags=self.flags())
238 m.dn = ldb.Dn(l, "dc=foo4")
240 m["objectUUID"] = b"0123456789abcdef"
241 self.assertEqual(len(l.search()), 0)
244 self.assertEqual(len(l.search()), 1)
246 l.delete(ldb.Dn(l, "dc=foo4"))
248 def test_search_iterator(self):
249 l = ldb.Ldb(self.url(), flags=self.flags())
250 s = l.search_iterator()
256 except RuntimeError as re:
261 except RuntimeError as re:
266 except RuntimeError as re:
269 s = l.search_iterator()
272 self.assertTrue(isinstance(me, ldb.Message))
275 self.assertEqual(len(r), 0)
276 self.assertEqual(count, 0)
279 m1.dn = ldb.Dn(l, "dc=foo4")
281 m1["objectUUID"] = b"0123456789abcdef"
284 s = l.search_iterator()
287 self.assertTrue(isinstance(me, ldb.Message))
291 self.assertEqual(len(r), 0)
292 self.assertEqual(len(msgs), 1)
293 self.assertEqual(msgs[0].dn, m1.dn)
296 m2.dn = ldb.Dn(l, "dc=foo5")
298 m2["objectUUID"] = b"0123456789abcdee"
301 s = l.search_iterator()
304 self.assertTrue(isinstance(me, ldb.Message))
308 self.assertEqual(len(r), 0)
309 self.assertEqual(len(msgs), 2)
310 if msgs[0].dn == m1.dn:
311 self.assertEqual(msgs[0].dn, m1.dn)
312 self.assertEqual(msgs[1].dn, m2.dn)
314 self.assertEqual(msgs[0].dn, m2.dn)
315 self.assertEqual(msgs[1].dn, m1.dn)
317 s = l.search_iterator()
320 self.assertTrue(isinstance(me, ldb.Message))
327 except RuntimeError as re:
330 self.assertTrue(isinstance(me, ldb.Message))
338 self.assertEqual(len(r), 0)
339 self.assertEqual(len(msgs), 2)
340 if msgs[0].dn == m1.dn:
341 self.assertEqual(msgs[0].dn, m1.dn)
342 self.assertEqual(msgs[1].dn, m2.dn)
344 self.assertEqual(msgs[0].dn, m2.dn)
345 self.assertEqual(msgs[1].dn, m1.dn)
347 l.delete(ldb.Dn(l, "dc=foo4"))
348 l.delete(ldb.Dn(l, "dc=foo5"))
350 def test_add_text(self):
351 l = ldb.Ldb(self.url(), flags=self.flags())
353 m.dn = ldb.Dn(l, "dc=foo4")
355 m["objectUUID"] = b"0123456789abcdef"
356 self.assertEqual(len(l.search()), 0)
359 self.assertEqual(len(l.search()), 1)
361 l.delete(ldb.Dn(l, "dc=foo4"))
363 def test_add_w_unhandled_ctrl(self):
364 l = ldb.Ldb(self.url(), flags=self.flags())
366 m.dn = ldb.Dn(l, "dc=foo4")
368 self.assertEqual(len(l.search()), 0)
369 self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
371 def test_add_dict(self):
372 l = ldb.Ldb(self.url(), flags=self.flags())
373 m = {"dn": ldb.Dn(l, "dc=foo5"),
375 "objectUUID": b"0123456789abcdef"}
376 self.assertEqual(len(l.search()), 0)
379 self.assertEqual(len(l.search()), 1)
381 l.delete(ldb.Dn(l, "dc=foo5"))
383 def test_add_dict_text(self):
384 l = ldb.Ldb(self.url(), flags=self.flags())
385 m = {"dn": ldb.Dn(l, "dc=foo5"),
387 "objectUUID": b"0123456789abcdef"}
388 self.assertEqual(len(l.search()), 0)
391 self.assertEqual(len(l.search()), 1)
393 l.delete(ldb.Dn(l, "dc=foo5"))
395 def test_add_dict_string_dn(self):
396 l = ldb.Ldb(self.url(), flags=self.flags())
397 m = {"dn": "dc=foo6", "bla": b"bla",
398 "objectUUID": b"0123456789abcdef"}
399 self.assertEqual(len(l.search()), 0)
402 self.assertEqual(len(l.search()), 1)
404 l.delete(ldb.Dn(l, "dc=foo6"))
406 def test_add_dict_bytes_dn(self):
407 l = ldb.Ldb(self.url(), flags=self.flags())
408 m = {"dn": b"dc=foo6", "bla": b"bla",
409 "objectUUID": b"0123456789abcdef"}
410 self.assertEqual(len(l.search()), 0)
413 self.assertEqual(len(l.search()), 1)
415 l.delete(ldb.Dn(l, "dc=foo6"))
417 def test_rename(self):
418 l = ldb.Ldb(self.url(), flags=self.flags())
420 m.dn = ldb.Dn(l, "dc=foo7")
422 m["objectUUID"] = b"0123456789abcdef"
423 self.assertEqual(len(l.search()), 0)
426 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
427 self.assertEqual(len(l.search()), 1)
429 l.delete(ldb.Dn(l, "dc=bar"))
431 def test_rename_string_dns(self):
432 l = ldb.Ldb(self.url(), flags=self.flags())
434 m.dn = ldb.Dn(l, "dc=foo8")
436 m["objectUUID"] = b"0123456789abcdef"
437 self.assertEqual(len(l.search()), 0)
439 self.assertEqual(len(l.search()), 1)
441 l.rename("dc=foo8", "dc=bar")
442 self.assertEqual(len(l.search()), 1)
444 l.delete(ldb.Dn(l, "dc=bar"))
446 def test_rename_bad_string_dns(self):
447 l = ldb.Ldb(self.url(), flags=self.flags())
449 m.dn = ldb.Dn(l, "dc=foo8")
451 m["objectUUID"] = b"0123456789abcdef"
452 self.assertEqual(len(l.search()), 0)
454 self.assertEqual(len(l.search()), 1)
455 self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
456 self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
457 l.delete(ldb.Dn(l, "dc=foo8"))
459 def test_empty_dn(self):
460 l = ldb.Ldb(self.url(), flags=self.flags())
461 self.assertEqual(0, len(l.search()))
463 m.dn = ldb.Dn(l, "dc=empty")
464 m["objectUUID"] = b"0123456789abcdef"
467 self.assertEqual(1, len(rm))
468 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
472 self.assertEqual(1, len(rm))
473 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
475 rm = l.search(m.dn, attrs=["blah"])
476 self.assertEqual(1, len(rm))
477 self.assertEqual(0, len(rm[0]))
479 def test_modify_delete(self):
480 l = ldb.Ldb(self.url(), flags=self.flags())
482 m.dn = ldb.Dn(l, "dc=modifydelete")
484 m["objectUUID"] = b"0123456789abcdef"
486 rm = l.search(m.dn)[0]
487 self.assertEqual([b"1234"], list(rm["bla"]))
490 m.dn = ldb.Dn(l, "dc=modifydelete")
491 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
492 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
495 self.assertEqual(1, len(rm))
496 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
498 rm = l.search(m.dn, attrs=["bla"])
499 self.assertEqual(1, len(rm))
500 self.assertEqual(0, len(rm[0]))
502 l.delete(ldb.Dn(l, "dc=modifydelete"))
504 def test_modify_delete_text(self):
505 l = ldb.Ldb(self.url(), flags=self.flags())
507 m.dn = ldb.Dn(l, "dc=modifydelete")
508 m.text["bla"] = ["1234"]
509 m["objectUUID"] = b"0123456789abcdef"
511 rm = l.search(m.dn)[0]
512 self.assertEqual(["1234"], list(rm.text["bla"]))
515 m.dn = ldb.Dn(l, "dc=modifydelete")
516 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
517 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
520 self.assertEqual(1, len(rm))
521 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
523 rm = l.search(m.dn, attrs=["bla"])
524 self.assertEqual(1, len(rm))
525 self.assertEqual(0, len(rm[0]))
527 l.delete(ldb.Dn(l, "dc=modifydelete"))
529 def test_modify_add(self):
530 l = ldb.Ldb(self.url(), flags=self.flags())
532 m.dn = ldb.Dn(l, "dc=add")
534 m["objectUUID"] = b"0123456789abcdef"
538 m.dn = ldb.Dn(l, "dc=add")
539 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
540 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
542 rm = l.search(m.dn)[0]
543 self.assertEqual(3, len(rm))
544 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
546 l.delete(ldb.Dn(l, "dc=add"))
548 def test_modify_add_text(self):
549 l = ldb.Ldb(self.url(), flags=self.flags())
551 m.dn = ldb.Dn(l, "dc=add")
552 m.text["bla"] = ["1234"]
553 m["objectUUID"] = b"0123456789abcdef"
557 m.dn = ldb.Dn(l, "dc=add")
558 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
559 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
561 rm = l.search(m.dn)[0]
562 self.assertEqual(3, len(rm))
563 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
565 l.delete(ldb.Dn(l, "dc=add"))
567 def test_modify_replace(self):
568 l = ldb.Ldb(self.url(), flags=self.flags())
570 m.dn = ldb.Dn(l, "dc=modify2")
571 m["bla"] = [b"1234", b"456"]
572 m["objectUUID"] = b"0123456789abcdef"
576 m.dn = ldb.Dn(l, "dc=modify2")
577 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
578 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
580 rm = l.search(m.dn)[0]
581 self.assertEqual(3, len(rm))
582 self.assertEqual([b"789"], list(rm["bla"]))
583 rm = l.search(m.dn, attrs=["bla"])[0]
584 self.assertEqual(1, len(rm))
586 l.delete(ldb.Dn(l, "dc=modify2"))
588 def test_modify_replace_text(self):
589 l = ldb.Ldb(self.url(), flags=self.flags())
591 m.dn = ldb.Dn(l, "dc=modify2")
592 m.text["bla"] = ["1234", "456"]
593 m["objectUUID"] = b"0123456789abcdef"
597 m.dn = ldb.Dn(l, "dc=modify2")
598 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
599 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
601 rm = l.search(m.dn)[0]
602 self.assertEqual(3, len(rm))
603 self.assertEqual(["789"], list(rm.text["bla"]))
604 rm = l.search(m.dn, attrs=["bla"])[0]
605 self.assertEqual(1, len(rm))
607 l.delete(ldb.Dn(l, "dc=modify2"))
609 def test_modify_flags_change(self):
610 l = ldb.Ldb(self.url(), flags=self.flags())
612 m.dn = ldb.Dn(l, "dc=add")
614 m["objectUUID"] = b"0123456789abcdef"
618 m.dn = ldb.Dn(l, "dc=add")
619 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
620 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
622 rm = l.search(m.dn)[0]
623 self.assertEqual(3, len(rm))
624 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
626 # Now create another modify, but switch the flags before we do it
627 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
628 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
630 rm = l.search(m.dn, attrs=["bla"])[0]
631 self.assertEqual(1, len(rm))
632 self.assertEqual([b"1234"], list(rm["bla"]))
634 l.delete(ldb.Dn(l, "dc=add"))
636 def test_modify_flags_change_text(self):
637 l = ldb.Ldb(self.url(), flags=self.flags())
639 m.dn = ldb.Dn(l, "dc=add")
640 m.text["bla"] = ["1234"]
641 m["objectUUID"] = b"0123456789abcdef"
645 m.dn = ldb.Dn(l, "dc=add")
646 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
647 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
649 rm = l.search(m.dn)[0]
650 self.assertEqual(3, len(rm))
651 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
653 # Now create another modify, but switch the flags before we do it
654 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
655 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
657 rm = l.search(m.dn, attrs=["bla"])[0]
658 self.assertEqual(1, len(rm))
659 self.assertEqual(["1234"], list(rm.text["bla"]))
661 l.delete(ldb.Dn(l, "dc=add"))
663 def test_transaction_commit(self):
664 l = ldb.Ldb(self.url(), flags=self.flags())
665 l.transaction_start()
666 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
668 m["objectUUID"] = b"0123456789abcdef"
670 l.transaction_commit()
673 def test_transaction_cancel(self):
674 l = ldb.Ldb(self.url(), flags=self.flags())
675 l.transaction_start()
676 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
678 m["objectUUID"] = b"0123456789abcdee"
680 l.transaction_cancel()
681 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
683 def test_set_debug(self):
684 def my_report_fn(level, text):
686 l = ldb.Ldb(self.url(), flags=self.flags())
687 l.set_debug(my_report_fn)
689 def test_zero_byte_string(self):
690 """Testing we do not get trapped in the \0 byte in a property string."""
691 l = ldb.Ldb(self.url(), flags=self.flags())
694 "objectclass": b"user",
695 "cN": b"LDAPtestUSER",
696 "givenname": b"ldap",
697 "displayname": b"foo\0bar",
698 "objectUUID": b"0123456789abcdef"
700 res = l.search(expression="(dn=dc=somedn)")
701 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
703 def test_no_crash_broken_expr(self):
704 l = ldb.Ldb(self.url(), flags=self.flags())
705 self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
707 # Run the SimpleLdb tests against an lmdb backend
710 class SimpleLdbLmdb(SimpleLdb):
713 if os.environ.get('HAVE_LMDB', '1') == '0':
714 self.skipTest("No lmdb backend")
715 self.prefix = MDB_PREFIX
716 self.index = MDB_INDEX_OBJ
717 super(SimpleLdbLmdb, self).setUp()
720 super(SimpleLdbLmdb, self).tearDown()
723 class SimpleLdbNoLmdb(LdbBaseTest):
726 if os.environ.get('HAVE_LMDB', '1') != '0':
727 self.skipTest("lmdb backend enabled")
728 self.prefix = MDB_PREFIX
729 self.index = MDB_INDEX_OBJ
730 super(SimpleLdbNoLmdb, self).setUp()
733 super(SimpleLdbNoLmdb, self).tearDown()
735 def test_lmdb_disabled(self):
736 self.testdir = tempdir()
737 self.filename = os.path.join(self.testdir, "test.ldb")
739 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
740 self.fail("Should have failed on missing LMDB")
741 except ldb.LdbError as err:
743 self.assertEqual(enum, ldb.ERR_OTHER)
746 class SearchTests(LdbBaseTest):
748 shutil.rmtree(self.testdir)
749 super(SearchTests, self).tearDown()
751 # Ensure the LDB is closed now, so we close the FD
755 super(SearchTests, self).setUp()
756 self.testdir = tempdir()
757 self.filename = os.path.join(self.testdir, "search_test.ldb")
758 options = ["modules:rdn_name"]
759 if hasattr(self, 'IDXCHECK'):
760 options.append("disable_full_db_scan_for_self_test:1")
761 self.l = ldb.Ldb(self.url(),
765 self.l.add(self.index)
766 except AttributeError:
769 self.l.add({"dn": "@ATTRIBUTES",
770 "DC": "CASE_INSENSITIVE"})
772 # Note that we can't use the name objectGUID here, as we
773 # want to stay clear of the objectGUID handler in LDB and
774 # instead use just the 16 bytes raw, which we just keep
775 # to printable chars here for ease of handling.
777 self.l.add({"dn": "DC=SAMBA,DC=ORG",
778 "name": b"samba.org",
779 "objectUUID": b"0123456789abcdef"})
780 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
783 "objectUUID": b"0123456789abcde1"})
784 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
787 "objectUUID": b"0123456789abcde2"})
788 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
791 "objectUUID": b"0123456789abcde3"})
792 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
795 "objectUUID": b"0123456789abcde4"})
796 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
799 "objectUUID": b"0123456789abcde5"})
800 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
803 "objectUUID": b"0123456789abcde6"})
804 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
807 "objectUUID": b"0123456789abcde7"})
808 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
811 "objectUUID": b"0123456789abcde8"})
812 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
815 "objectUUID": b"0123456789abcde9"})
816 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
819 "objectUUID": b"0123456789abcde0"})
820 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
823 "objectUUID": b"0123456789abcdea"})
824 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
827 "objectUUID": b"0123456789abcdeb"})
828 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
831 "objectUUID": b"0123456789abcdec"})
832 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
835 "objectUUID": b"0123456789abcded"})
836 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
839 "objectUUID": b"0123456789abcdee"})
840 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
843 "objectUUID": b"0123456789abcd01"})
844 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
847 "objectUUID": b"0123456789abcd02"})
848 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
851 "objectUUID": b"0123456789abcd03"})
852 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
855 "objectUUID": b"0123456789abcd04"})
856 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
859 "objectUUID": b"0123456789abcd05"})
860 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
863 "objectUUID": b"0123456789abcd06"})
864 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
867 "objectUUID": b"0123456789abcd07"})
868 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
871 "objectUUID": b"0123456789abcd08"})
872 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
875 "objectUUID": b"0123456789abcd09"})
878 """Testing a search"""
880 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
881 scope=ldb.SCOPE_BASE)
882 self.assertEqual(len(res11), 1)
884 def test_base_lower(self):
885 """Testing a search"""
887 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
888 scope=ldb.SCOPE_BASE)
889 self.assertEqual(len(res11), 1)
891 def test_base_or(self):
892 """Testing a search"""
894 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
895 scope=ldb.SCOPE_BASE,
896 expression="(|(ou=ou11)(ou=ou12))")
897 self.assertEqual(len(res11), 1)
899 def test_base_or2(self):
900 """Testing a search"""
902 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
903 scope=ldb.SCOPE_BASE,
904 expression="(|(x=y)(y=b))")
905 self.assertEqual(len(res11), 1)
907 def test_base_and(self):
908 """Testing a search"""
910 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
911 scope=ldb.SCOPE_BASE,
912 expression="(&(ou=ou11)(ou=ou12))")
913 self.assertEqual(len(res11), 0)
915 def test_base_and2(self):
916 """Testing a search"""
918 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
919 scope=ldb.SCOPE_BASE,
920 expression="(&(x=y)(y=a))")
921 self.assertEqual(len(res11), 1)
923 def test_base_false(self):
924 """Testing a search"""
926 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
927 scope=ldb.SCOPE_BASE,
928 expression="(|(ou=ou13)(ou=ou12))")
929 self.assertEqual(len(res11), 0)
931 def test_check_base_false(self):
932 """Testing a search"""
933 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
934 scope=ldb.SCOPE_BASE,
935 expression="(|(ou=ou13)(ou=ou12))")
936 self.assertEqual(len(res11), 0)
938 def test_check_base_error(self):
939 """Testing a search"""
940 checkbaseonsearch = {"dn": "@OPTIONS",
941 "checkBaseOnSearch": b"TRUE"}
943 self.l.add(checkbaseonsearch)
944 except ldb.LdbError as err:
946 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
947 m = ldb.Message.from_dict(self.l,
952 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
953 scope=ldb.SCOPE_BASE,
954 expression="(|(ou=ou13)(ou=ou12))")
955 self.fail("Should have failed on missing base")
956 except ldb.LdbError as err:
958 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
960 def test_subtree_and(self):
961 """Testing a search"""
963 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
964 scope=ldb.SCOPE_SUBTREE,
965 expression="(&(ou=ou11)(ou=ou12))")
966 self.assertEqual(len(res11), 0)
968 def test_subtree_and2(self):
969 """Testing a search"""
971 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
972 scope=ldb.SCOPE_SUBTREE,
973 expression="(&(x=y)(|(y=b)(y=c)))")
974 self.assertEqual(len(res11), 1)
976 def test_subtree_and2_lower(self):
977 """Testing a search"""
979 res11 = self.l.search(base="DC=samba,DC=org",
980 scope=ldb.SCOPE_SUBTREE,
981 expression="(&(x=y)(|(y=b)(y=c)))")
982 self.assertEqual(len(res11), 1)
984 def test_subtree_or(self):
985 """Testing a search"""
987 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
988 scope=ldb.SCOPE_SUBTREE,
989 expression="(|(ou=ou11)(ou=ou12))")
990 self.assertEqual(len(res11), 2)
992 def test_subtree_or2(self):
993 """Testing a search"""
995 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
996 scope=ldb.SCOPE_SUBTREE,
997 expression="(|(x=y)(y=b))")
998 self.assertEqual(len(res11), 20)
1000 def test_subtree_or3(self):
1001 """Testing a search"""
1003 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1004 scope=ldb.SCOPE_SUBTREE,
1005 expression="(|(x=y)(y=b)(y=c))")
1006 self.assertEqual(len(res11), 22)
1008 def test_one_and(self):
1009 """Testing a search"""
1011 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1012 scope=ldb.SCOPE_ONELEVEL,
1013 expression="(&(ou=ou11)(ou=ou12))")
1014 self.assertEqual(len(res11), 0)
1016 def test_one_and2(self):
1017 """Testing a search"""
1019 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1020 scope=ldb.SCOPE_ONELEVEL,
1021 expression="(&(x=y)(y=b))")
1022 self.assertEqual(len(res11), 1)
1024 def test_one_or(self):
1025 """Testing a search"""
1027 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1028 scope=ldb.SCOPE_ONELEVEL,
1029 expression="(|(ou=ou11)(ou=ou12))")
1030 self.assertEqual(len(res11), 2)
1032 def test_one_or2(self):
1033 """Testing a search"""
1035 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1036 scope=ldb.SCOPE_ONELEVEL,
1037 expression="(|(x=y)(y=b))")
1038 self.assertEqual(len(res11), 20)
1040 def test_one_or2_lower(self):
1041 """Testing a search"""
1043 res11 = self.l.search(base="DC=samba,DC=org",
1044 scope=ldb.SCOPE_ONELEVEL,
1045 expression="(|(x=y)(y=b))")
1046 self.assertEqual(len(res11), 20)
1048 def test_one_unindexable(self):
1049 """Testing a search"""
1052 res11 = self.l.search(base="DC=samba,DC=org",
1053 scope=ldb.SCOPE_ONELEVEL,
1054 expression="(y=b*)")
1055 if hasattr(self, 'IDX') and \
1056 not hasattr(self, 'IDXONE') and \
1057 hasattr(self, 'IDXCHECK'):
1058 self.fail("Should have failed as un-indexed search")
1060 self.assertEqual(len(res11), 9)
1062 except ldb.LdbError as err:
1065 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1066 self.assertIn(estr, "ldb FULL SEARCH disabled")
1068 def test_one_unindexable_presence(self):
1069 """Testing a search"""
1072 res11 = self.l.search(base="DC=samba,DC=org",
1073 scope=ldb.SCOPE_ONELEVEL,
1075 if hasattr(self, 'IDX') and \
1076 not hasattr(self, 'IDXONE') and \
1077 hasattr(self, 'IDXCHECK'):
1078 self.fail("Should have failed as un-indexed search")
1080 self.assertEqual(len(res11), 24)
1082 except ldb.LdbError as err:
1085 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1086 self.assertIn(estr, "ldb FULL SEARCH disabled")
1088 def test_subtree_and_or(self):
1089 """Testing a search"""
1091 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1092 scope=ldb.SCOPE_SUBTREE,
1093 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1094 self.assertEqual(len(res11), 0)
1096 def test_subtree_and_or2(self):
1097 """Testing a search"""
1099 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1100 scope=ldb.SCOPE_SUBTREE,
1101 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1102 self.assertEqual(len(res11), 0)
1104 def test_subtree_and_or3(self):
1105 """Testing a search"""
1107 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1108 scope=ldb.SCOPE_SUBTREE,
1109 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1110 self.assertEqual(len(res11), 2)
1112 def test_subtree_and_or4(self):
1113 """Testing a search"""
1115 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1116 scope=ldb.SCOPE_SUBTREE,
1117 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1118 self.assertEqual(len(res11), 2)
1120 def test_subtree_and_or5(self):
1121 """Testing a search"""
1123 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1124 scope=ldb.SCOPE_SUBTREE,
1125 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1126 self.assertEqual(len(res11), 1)
1128 def test_subtree_or_and(self):
1129 """Testing a search"""
1131 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1132 scope=ldb.SCOPE_SUBTREE,
1133 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1134 self.assertEqual(len(res11), 10)
1136 def test_subtree_large_and_unique(self):
1137 """Testing a search"""
1139 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1140 scope=ldb.SCOPE_SUBTREE,
1141 expression="(&(ou=ou10)(y=a))")
1142 self.assertEqual(len(res11), 1)
1144 def test_subtree_and_none(self):
1145 """Testing a search"""
1147 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1148 scope=ldb.SCOPE_SUBTREE,
1149 expression="(&(ou=ouX)(y=a))")
1150 self.assertEqual(len(res11), 0)
1152 def test_subtree_and_idx_record(self):
1153 """Testing a search against the index record"""
1155 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1156 scope=ldb.SCOPE_SUBTREE,
1157 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1158 self.assertEqual(len(res11), 0)
1160 def test_subtree_and_idxone_record(self):
1161 """Testing a search against the index record"""
1163 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1164 scope=ldb.SCOPE_SUBTREE,
1165 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1166 self.assertEqual(len(res11), 0)
1168 def test_subtree_unindexable(self):
1169 """Testing a search"""
1172 res11 = self.l.search(base="DC=samba,DC=org",
1173 scope=ldb.SCOPE_SUBTREE,
1174 expression="(y=b*)")
1175 if hasattr(self, 'IDX') and \
1176 hasattr(self, 'IDXCHECK'):
1177 self.fail("Should have failed as un-indexed search")
1179 self.assertEqual(len(res11), 9)
1181 except ldb.LdbError as err:
1184 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1185 self.assertIn(estr, "ldb FULL SEARCH disabled")
1187 def test_subtree_unindexable_presence(self):
1188 """Testing a search"""
1191 res11 = self.l.search(base="DC=samba,DC=org",
1192 scope=ldb.SCOPE_SUBTREE,
1194 if hasattr(self, 'IDX') and \
1195 hasattr(self, 'IDXCHECK'):
1196 self.fail("Should have failed as un-indexed search")
1198 self.assertEqual(len(res11), 24)
1200 except ldb.LdbError as err:
1203 self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1204 self.assertIn(estr, "ldb FULL SEARCH disabled")
1206 def test_dn_filter_one(self):
1207 """Testing that a dn= filter succeeds
1208 (or fails with disallowDNFilter
1209 set and IDXGUID or (IDX and not IDXONE) mode)
1210 when the scope is SCOPE_ONELEVEL.
1212 This should be made more consistent, but for now lock in
1217 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1218 scope=ldb.SCOPE_ONELEVEL,
1219 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1220 if hasattr(self, 'disallowDNFilter') and \
1221 hasattr(self, 'IDX') and \
1222 (hasattr(self, 'IDXGUID') or
1223 ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1224 self.assertEqual(len(res11), 0)
1226 self.assertEqual(len(res11), 1)
1228 def test_dn_filter_subtree(self):
1229 """Testing that a dn= filter succeeds
1230 (or fails with disallowDNFilter set)
1231 when the scope is SCOPE_SUBTREE"""
1233 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1234 scope=ldb.SCOPE_SUBTREE,
1235 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1236 if hasattr(self, 'disallowDNFilter') \
1237 and hasattr(self, 'IDX'):
1238 self.assertEqual(len(res11), 0)
1240 self.assertEqual(len(res11), 1)
1242 def test_dn_filter_base(self):
1243 """Testing that (incorrectly) a dn= filter works
1244 when the scope is SCOPE_BASE"""
1246 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1247 scope=ldb.SCOPE_BASE,
1248 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1250 # At some point we should fix this, but it isn't trivial
1251 self.assertEqual(len(res11), 1)
1253 def test_distinguishedName_filter_one(self):
1254 """Testing that a distinguishedName= filter succeeds
1255 when the scope is SCOPE_ONELEVEL.
1257 This should be made more consistent, but for now lock in
1262 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1263 scope=ldb.SCOPE_ONELEVEL,
1264 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1265 self.assertEqual(len(res11), 1)
1267 def test_distinguishedName_filter_subtree(self):
1268 """Testing that a distinguishedName= filter succeeds
1269 when the scope is SCOPE_SUBTREE"""
1271 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1272 scope=ldb.SCOPE_SUBTREE,
1273 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1274 self.assertEqual(len(res11), 1)
1276 def test_distinguishedName_filter_base(self):
1277 """Testing that (incorrectly) a distinguishedName= filter works
1278 when the scope is SCOPE_BASE"""
1280 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1281 scope=ldb.SCOPE_BASE,
1282 expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1284 # At some point we should fix this, but it isn't trivial
1285 self.assertEqual(len(res11), 1)
1287 def test_bad_dn_filter_base(self):
1288 """Testing that a dn= filter on an invalid DN works
1289 when the scope is SCOPE_BASE but
1290 returns zero results"""
1292 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1293 scope=ldb.SCOPE_BASE,
1294 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1296 # At some point we should fix this, but it isn't trivial
1297 self.assertEqual(len(res11), 0)
1300 def test_bad_dn_filter_one(self):
1301 """Testing that a dn= filter succeeds but returns zero
1302 results when the DN is not valid on a SCOPE_ONELEVEL search
1306 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1307 scope=ldb.SCOPE_ONELEVEL,
1308 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1309 self.assertEqual(len(res11), 0)
1311 def test_bad_dn_filter_subtree(self):
1312 """Testing that a dn= filter succeeds but returns zero
1313 results when the DN is not valid on a SCOPE_SUBTREE search
1317 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1318 scope=ldb.SCOPE_SUBTREE,
1319 expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1320 self.assertEqual(len(res11), 0)
1322 def test_bad_distinguishedName_filter_base(self):
1323 """Testing that a distinguishedName= filter on an invalid DN works
1324 when the scope is SCOPE_BASE but
1325 returns zero results"""
1327 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1328 scope=ldb.SCOPE_BASE,
1329 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1331 # At some point we should fix this, but it isn't trivial
1332 self.assertEqual(len(res11), 0)
1335 def test_bad_distinguishedName_filter_one(self):
1336 """Testing that a distinguishedName= filter succeeds but returns zero
1337 results when the DN is not valid on a SCOPE_ONELEVEL search
1341 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1342 scope=ldb.SCOPE_ONELEVEL,
1343 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1344 self.assertEqual(len(res11), 0)
1346 def test_bad_distinguishedName_filter_subtree(self):
1347 """Testing that a distinguishedName= filter succeeds but returns zero
1348 results when the DN is not valid on a SCOPE_SUBTREE search
1352 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1353 scope=ldb.SCOPE_SUBTREE,
1354 expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1355 self.assertEqual(len(res11), 0)
1357 def test_bad_dn_search_base(self):
1358 """Testing with a bad base DN (SCOPE_BASE)"""
1361 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1362 scope=ldb.SCOPE_BASE)
1363 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1364 except ldb.LdbError as err:
1366 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1369 def test_bad_dn_search_one(self):
1370 """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1373 res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1374 scope=ldb.SCOPE_ONELEVEL)
1375 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1376 except ldb.LdbError as err:
1378 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1380 def test_bad_dn_search_subtree(self):
1381 """Testing with a bad base DN (SCOPE_SUBTREE)"""
1384 res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1385 scope=ldb.SCOPE_SUBTREE)
1386 self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1387 except ldb.LdbError as err:
1389 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1393 # Run the search tests against an lmdb backend
1394 class SearchTestsLmdb(SearchTests):
1397 if os.environ.get('HAVE_LMDB', '1') == '0':
1398 self.skipTest("No lmdb backend")
1399 self.prefix = MDB_PREFIX
1400 self.index = MDB_INDEX_OBJ
1401 super(SearchTestsLmdb, self).setUp()
1404 super(SearchTestsLmdb, self).tearDown()
1407 class IndexedSearchTests(SearchTests):
1408 """Test searches using the index, to ensure the index doesn't
1412 super(IndexedSearchTests, self).setUp()
1413 self.l.add({"dn": "@INDEXLIST",
1414 "@IDXATTR": [b"x", b"y", b"ou"]})
1418 class IndexedCheckSearchTests(IndexedSearchTests):
1419 """Test searches using the index, to ensure the index doesn't
1420 break things (full scan disabled)"""
1423 self.IDXCHECK = True
1424 super(IndexedCheckSearchTests, self).setUp()
1427 class IndexedSearchDnFilterTests(SearchTests):
1428 """Test searches using the index, to ensure the index doesn't
1432 super(IndexedSearchDnFilterTests, self).setUp()
1433 self.l.add({"dn": "@OPTIONS",
1434 "disallowDNFilter": "TRUE"})
1435 self.disallowDNFilter = True
1437 self.l.add({"dn": "@INDEXLIST",
1438 "@IDXATTR": [b"x", b"y", b"ou"]})
1442 class IndexedAndOneLevelSearchTests(SearchTests):
1443 """Test searches using the index including @IDXONE, to ensure
1444 the index doesn't break things"""
1447 super(IndexedAndOneLevelSearchTests, self).setUp()
1448 self.l.add({"dn": "@INDEXLIST",
1449 "@IDXATTR": [b"x", b"y", b"ou"],
1455 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1456 """Test searches using the index including @IDXONE, to ensure
1457 the index doesn't break things (full scan disabled)"""
1460 self.IDXCHECK = True
1461 super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1464 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1465 """Test searches using the index including @IDXONE, to ensure
1466 the index doesn't break things"""
1469 super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1470 self.l.add({"dn": "@OPTIONS",
1471 "disallowDNFilter": "TRUE",
1472 "checkBaseOnSearch": "TRUE"})
1473 self.disallowDNFilter = True
1474 self.checkBaseOnSearch = True
1476 self.l.add({"dn": "@INDEXLIST",
1477 "@IDXATTR": [b"x", b"y", b"ou"],
1483 class GUIDIndexedSearchTests(SearchTests):
1484 """Test searches using the index, to ensure the index doesn't
1488 self.index = {"dn": "@INDEXLIST",
1489 "@IDXATTR": [b"x", b"y", b"ou"],
1490 "@IDXGUID": [b"objectUUID"],
1491 "@IDX_DN_GUID": [b"GUID"]}
1492 super(GUIDIndexedSearchTests, self).setUp()
1498 class GUIDIndexedDNFilterSearchTests(SearchTests):
1499 """Test searches using the index, to ensure the index doesn't
1503 self.index = {"dn": "@INDEXLIST",
1504 "@IDXATTR": [b"x", b"y", b"ou"],
1505 "@IDXGUID": [b"objectUUID"],
1506 "@IDX_DN_GUID": [b"GUID"]}
1507 super(GUIDIndexedDNFilterSearchTests, self).setUp()
1508 self.l.add({"dn": "@OPTIONS",
1509 "disallowDNFilter": "TRUE",
1510 "checkBaseOnSearch": "TRUE"})
1511 self.disallowDNFilter = True
1512 self.checkBaseOnSearch = True
1517 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1518 """Test searches using the index including @IDXONE, to ensure
1519 the index doesn't break things"""
1522 self.index = {"dn": "@INDEXLIST",
1523 "@IDXATTR": [b"x", b"y", b"ou"],
1524 "@IDXGUID": [b"objectUUID"],
1525 "@IDX_DN_GUID": [b"GUID"]}
1526 super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1527 self.l.add({"dn": "@OPTIONS",
1528 "disallowDNFilter": "TRUE",
1529 "checkBaseOnSearch": "TRUE"})
1530 self.disallowDNFilter = True
1531 self.checkBaseOnSearch = True
1537 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1540 if os.environ.get('HAVE_LMDB', '1') == '0':
1541 self.skipTest("No lmdb backend")
1542 self.prefix = MDB_PREFIX
1543 super(GUIDIndexedSearchTestsLmdb, self).setUp()
1546 super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1549 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1552 if os.environ.get('HAVE_LMDB', '1') == '0':
1553 self.skipTest("No lmdb backend")
1554 self.prefix = MDB_PREFIX
1555 super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1558 super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1561 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1564 if os.environ.get('HAVE_LMDB', '1') == '0':
1565 self.skipTest("No lmdb backend")
1566 self.prefix = MDB_PREFIX
1567 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1570 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1573 class AddModifyTests(LdbBaseTest):
1575 shutil.rmtree(self.testdir)
1576 super(AddModifyTests, self).tearDown()
1578 # Ensure the LDB is closed now, so we close the FD
1582 super(AddModifyTests, self).setUp()
1583 self.testdir = tempdir()
1584 self.filename = os.path.join(self.testdir, "add_test.ldb")
1585 self.l = ldb.Ldb(self.url(),
1587 options=["modules:rdn_name"])
1589 self.l.add(self.index)
1590 except AttributeError:
1593 self.l.add({"dn": "DC=SAMBA,DC=ORG",
1594 "name": b"samba.org",
1595 "objectUUID": b"0123456789abcdef"})
1596 self.l.add({"dn": "@ATTRIBUTES",
1597 "objectUUID": "UNIQUE_INDEX"})
1599 def test_add_dup(self):
1600 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1603 "objectUUID": b"0123456789abcde1"})
1605 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1608 "objectUUID": b"0123456789abcde2"})
1609 self.fail("Should have failed adding dupliate entry")
1610 except ldb.LdbError as err:
1612 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1614 def test_add_bad(self):
1616 self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1619 "objectUUID": b"0123456789abcde1"})
1620 self.fail("Should have failed adding entry with invalid DN")
1621 except ldb.LdbError as err:
1623 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1625 def test_add_del_add(self):
1626 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1629 "objectUUID": b"0123456789abcde1"})
1630 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1631 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1634 "objectUUID": b"0123456789abcde2"})
1636 def test_add_move_add(self):
1637 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1640 "objectUUID": b"0123456789abcde1"})
1641 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1642 "OU=DUP2,DC=SAMBA,DC=ORG")
1643 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1646 "objectUUID": b"0123456789abcde2"})
1648 def test_add_move_fail_move_move(self):
1649 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1652 "objectUUID": b"0123456789abcde1"})
1653 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1656 "objectUUID": b"0123456789abcde2"})
1658 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1659 scope=ldb.SCOPE_SUBTREE,
1660 expression="(objectUUID=0123456789abcde1)")
1661 self.assertEqual(len(res2), 1)
1662 self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1664 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1665 scope=ldb.SCOPE_SUBTREE,
1666 expression="(objectUUID=0123456789abcde2)")
1667 self.assertEqual(len(res3), 1)
1668 self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1671 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1672 "OU=DUP2,DC=SAMBA,DC=ORG")
1673 self.fail("Should have failed on duplicate DN")
1674 except ldb.LdbError as err:
1676 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1678 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1679 "OU=DUP3,DC=SAMBA,DC=ORG")
1681 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1682 "OU=DUP2,DC=SAMBA,DC=ORG")
1684 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1685 scope=ldb.SCOPE_SUBTREE,
1686 expression="(objectUUID=0123456789abcde1)")
1687 self.assertEqual(len(res2), 1)
1688 self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1690 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1691 scope=ldb.SCOPE_SUBTREE,
1692 expression="(objectUUID=0123456789abcde2)")
1693 self.assertEqual(len(res3), 1)
1694 self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1696 def test_move_missing(self):
1698 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1699 "OU=DUP2,DC=SAMBA,DC=ORG")
1700 self.fail("Should have failed on missing")
1701 except ldb.LdbError as err:
1703 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1705 def test_move_missing2(self):
1706 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1709 "objectUUID": b"0123456789abcde2"})
1712 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1713 "OU=DUP2,DC=SAMBA,DC=ORG")
1714 self.fail("Should have failed on missing")
1715 except ldb.LdbError as err:
1717 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1719 def test_move_bad(self):
1720 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1723 "objectUUID": b"0123456789abcde2"})
1726 self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1727 "OU=DUP2,DC=SAMBA,DC=ORG")
1728 self.fail("Should have failed on invalid DN")
1729 except ldb.LdbError as err:
1731 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1733 def test_move_bad2(self):
1734 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1737 "objectUUID": b"0123456789abcde2"})
1740 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1741 "OUXDUP2,DC=SAMBA,DC=ORG")
1742 self.fail("Should have failed on missing")
1743 except ldb.LdbError as err:
1745 self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1747 def test_move_fail_move_add(self):
1748 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1751 "objectUUID": b"0123456789abcde1"})
1752 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1755 "objectUUID": b"0123456789abcde2"})
1757 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1758 "OU=DUP2,DC=SAMBA,DC=ORG")
1759 self.fail("Should have failed on duplicate DN")
1760 except ldb.LdbError as err:
1762 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1764 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1765 "OU=DUP3,DC=SAMBA,DC=ORG")
1767 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1770 "objectUUID": b"0123456789abcde3"})
1773 class AddModifyTestsLmdb(AddModifyTests):
1776 if os.environ.get('HAVE_LMDB', '1') == '0':
1777 self.skipTest("No lmdb backend")
1778 self.prefix = MDB_PREFIX
1779 self.index = MDB_INDEX_OBJ
1780 super(AddModifyTestsLmdb, self).setUp()
1783 super(AddModifyTestsLmdb, self).tearDown()
1786 class IndexedAddModifyTests(AddModifyTests):
1787 """Test searches using the index, to ensure the index doesn't
1791 if not hasattr(self, 'index'):
1792 self.index = {"dn": "@INDEXLIST",
1793 "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID", b"z"],
1795 super(IndexedAddModifyTests, self).setUp()
1797 def test_duplicate_GUID(self):
1799 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1802 "objectUUID": b"0123456789abcdef"})
1803 self.fail("Should have failed adding dupliate GUID")
1804 except ldb.LdbError as err:
1806 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1808 def test_duplicate_name_dup_GUID(self):
1809 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1812 "objectUUID": b"a123456789abcdef"})
1814 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1817 "objectUUID": b"a123456789abcdef"})
1818 self.fail("Should have failed adding dupliate GUID")
1819 except ldb.LdbError as err:
1821 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1823 def test_duplicate_name_dup_GUID2(self):
1824 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1827 "objectUUID": b"abc3456789abcdef"})
1829 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1832 "objectUUID": b"aaa3456789abcdef"})
1833 self.fail("Should have failed adding dupliate DN")
1834 except ldb.LdbError as err:
1836 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1838 # Checking the GUID didn't stick in the index
1839 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1842 "objectUUID": b"aaa3456789abcdef"})
1844 def test_add_dup_guid_add(self):
1845 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1848 "objectUUID": b"0123456789abcde1"})
1850 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1853 "objectUUID": b"0123456789abcde1"})
1854 self.fail("Should have failed on duplicate GUID")
1856 except ldb.LdbError as err:
1858 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1860 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1863 "objectUUID": b"0123456789abcde2"})
1865 def test_duplicate_index_values(self):
1866 self.l.add({"dn": "OU=DIV1,DC=SAMBA,DC=ORG",
1869 "objectUUID": b"0123456789abcdff"})
1870 self.l.add({"dn": "OU=DIV2,DC=SAMBA,DC=ORG",
1873 "objectUUID": b"0123456789abcdfd"})
1876 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1877 """Test searches using the index, to ensure the index doesn't
1881 self.index = {"dn": "@INDEXLIST",
1882 "@IDXATTR": [b"x", b"y", b"ou"],
1884 "@IDXGUID": [b"objectUUID"],
1885 "@IDX_DN_GUID": [b"GUID"]}
1886 super(GUIDIndexedAddModifyTests, self).setUp()
1889 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1890 """Test GUID index behaviour insdie the transaction"""
1893 super(GUIDTransIndexedAddModifyTests, self).setUp()
1894 self.l.transaction_start()
1897 self.l.transaction_commit()
1898 super(GUIDTransIndexedAddModifyTests, self).tearDown()
1901 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1902 """Test index behaviour insdie the transaction"""
1905 super(TransIndexedAddModifyTests, self).setUp()
1906 self.l.transaction_start()
1909 self.l.transaction_commit()
1910 super(TransIndexedAddModifyTests, self).tearDown()
1913 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1916 if os.environ.get('HAVE_LMDB', '1') == '0':
1917 self.skipTest("No lmdb backend")
1918 self.prefix = MDB_PREFIX
1919 super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1922 super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1925 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1928 if os.environ.get('HAVE_LMDB', '1') == '0':
1929 self.skipTest("No lmdb backend")
1930 self.prefix = MDB_PREFIX
1931 super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1934 super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1937 class BadIndexTests(LdbBaseTest):
1939 super(BadIndexTests, self).setUp()
1940 self.testdir = tempdir()
1941 self.filename = os.path.join(self.testdir, "test.ldb")
1942 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1943 if hasattr(self, 'IDXGUID'):
1944 self.ldb.add({"dn": "@INDEXLIST",
1945 "@IDXATTR": [b"x", b"y", b"ou"],
1946 "@IDXGUID": [b"objectUUID"],
1947 "@IDX_DN_GUID": [b"GUID"]})
1949 self.ldb.add({"dn": "@INDEXLIST",
1950 "@IDXATTR": [b"x", b"y", b"ou"]})
1952 super(BadIndexTests, self).setUp()
1954 def test_unique(self):
1955 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1956 "objectUUID": b"0123456789abcde1",
1958 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1959 "objectUUID": b"0123456789abcde2",
1961 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1962 "objectUUID": b"0123456789abcde3",
1965 res = self.ldb.search(expression="(y=1)",
1966 base="dc=samba,dc=org")
1967 self.assertEqual(len(res), 3)
1969 # Now set this to unique index, but forget to check the result
1971 self.ldb.add({"dn": "@ATTRIBUTES",
1972 "y": "UNIQUE_INDEX"})
1974 except ldb.LdbError:
1977 # We must still have a working index
1978 res = self.ldb.search(expression="(y=1)",
1979 base="dc=samba,dc=org")
1980 self.assertEqual(len(res), 3)
1982 def test_unique_transaction(self):
1983 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1984 "objectUUID": b"0123456789abcde1",
1986 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1987 "objectUUID": b"0123456789abcde2",
1989 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1990 "objectUUID": b"0123456789abcde3",
1993 res = self.ldb.search(expression="(y=1)",
1994 base="dc=samba,dc=org")
1995 self.assertEqual(len(res), 3)
1997 self.ldb.transaction_start()
1999 # Now set this to unique index, but forget to check the result
2001 self.ldb.add({"dn": "@ATTRIBUTES",
2002 "y": "UNIQUE_INDEX"})
2003 except ldb.LdbError:
2007 self.ldb.transaction_commit()
2010 except ldb.LdbError as err:
2012 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2014 # We must still have a working index
2015 res = self.ldb.search(expression="(y=1)",
2016 base="dc=samba,dc=org")
2018 self.assertEqual(len(res), 3)
2020 def test_casefold(self):
2021 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2022 "objectUUID": b"0123456789abcde1",
2024 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2025 "objectUUID": b"0123456789abcde2",
2027 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2028 "objectUUID": b"0123456789abcde3",
2031 res = self.ldb.search(expression="(y=a)",
2032 base="dc=samba,dc=org")
2033 self.assertEqual(len(res), 2)
2035 self.ldb.add({"dn": "@ATTRIBUTES",
2036 "y": "CASE_INSENSITIVE"})
2038 # We must still have a working index
2039 res = self.ldb.search(expression="(y=a)",
2040 base="dc=samba,dc=org")
2042 if hasattr(self, 'IDXGUID'):
2043 self.assertEqual(len(res), 3)
2045 # We should not return this entry twice, but sadly
2046 # we have not yet fixed
2047 # https://bugzilla.samba.org/show_bug.cgi?id=13361
2048 self.assertEqual(len(res), 4)
2050 def test_casefold_transaction(self):
2051 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2052 "objectUUID": b"0123456789abcde1",
2054 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2055 "objectUUID": b"0123456789abcde2",
2057 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2058 "objectUUID": b"0123456789abcde3",
2061 res = self.ldb.search(expression="(y=a)",
2062 base="dc=samba,dc=org")
2063 self.assertEqual(len(res), 2)
2065 self.ldb.transaction_start()
2067 self.ldb.add({"dn": "@ATTRIBUTES",
2068 "y": "CASE_INSENSITIVE"})
2070 self.ldb.transaction_commit()
2072 # We must still have a working index
2073 res = self.ldb.search(expression="(y=a)",
2074 base="dc=samba,dc=org")
2076 if hasattr(self, 'IDXGUID'):
2077 self.assertEqual(len(res), 3)
2079 # We should not return this entry twice, but sadly
2080 # we have not yet fixed
2081 # https://bugzilla.samba.org/show_bug.cgi?id=13361
2082 self.assertEqual(len(res), 4)
2084 def test_modify_transaction(self):
2085 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2086 "objectUUID": b"0123456789abcde1",
2090 res = self.ldb.search(expression="(y=2)",
2091 base="dc=samba,dc=org")
2092 self.assertEqual(len(res), 1)
2094 self.ldb.add({"dn": "@ATTRIBUTES",
2095 "y": "UNIQUE_INDEX"})
2097 self.ldb.transaction_start()
2100 m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2101 m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2102 m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2108 except ldb.LdbError as err:
2110 self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2113 self.ldb.transaction_commit()
2114 # We should fail here, but we want to be sure
2117 except ldb.LdbError as err:
2119 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2121 # The index should still be pointing to x=y
2122 res = self.ldb.search(expression="(y=2)",
2123 base="dc=samba,dc=org")
2124 self.assertEqual(len(res), 1)
2127 self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
2128 "objectUUID": b"0123456789abcde2",
2130 self.fail("Added unique attribute twice")
2131 except ldb.LdbError as err:
2133 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2135 res = self.ldb.search(expression="(y=2)",
2136 base="dc=samba,dc=org")
2137 self.assertEqual(len(res), 1)
2138 self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
2141 super(BadIndexTests, self).tearDown()
2144 class GUIDBadIndexTests(BadIndexTests):
2145 """Test Bad index things with GUID index mode"""
2150 super(GUIDBadIndexTests, self).setUp()
2153 class GUIDBadIndexTestsLmdb(BadIndexTests):
2156 if os.environ.get('HAVE_LMDB', '1') == '0':
2157 self.skipTest("No lmdb backend")
2158 self.prefix = MDB_PREFIX
2159 self.index = MDB_INDEX_OBJ
2161 super(GUIDBadIndexTestsLmdb, self).setUp()
2164 super(GUIDBadIndexTestsLmdb, self).tearDown()
2167 class BatchModeTests(LdbBaseTest):
2170 super(BatchModeTests, self).setUp()
2171 self.testdir = tempdir()
2172 self.filename = os.path.join(self.testdir, "test.ldb")
2173 self.ldb = ldb.Ldb(self.url(),
2175 options=["batch_mode:1"])
2176 if hasattr(self, 'IDXGUID'):
2177 self.ldb.add({"dn": "@INDEXLIST",
2178 "@IDXATTR": [b"x", b"y", b"ou"],
2179 "@IDXGUID": [b"objectUUID"],
2180 "@IDX_DN_GUID": [b"GUID"]})
2182 self.ldb.add({"dn": "@INDEXLIST",
2183 "@IDXATTR": [b"x", b"y", b"ou"]})
2185 def test_modify_transaction(self):
2186 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2187 "objectUUID": b"0123456789abcde1",
2191 res = self.ldb.search(expression="(y=2)",
2192 base="dc=samba,dc=org")
2193 self.assertEqual(len(res), 1)
2195 self.ldb.add({"dn": "@ATTRIBUTES",
2196 "y": "UNIQUE_INDEX"})
2198 self.ldb.transaction_start()
2201 m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2202 m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2203 m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2209 except ldb.LdbError as err:
2211 self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2214 self.ldb.transaction_commit()
2215 self.fail("Commit should have failed as we were in batch mode")
2216 except ldb.LdbError as err:
2218 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2221 super(BatchModeTests, self).tearDown()
2224 class DnTests(TestCase):
2227 super(DnTests, self).setUp()
2228 self.ldb = ldb.Ldb()
2231 super(DnTests, self).tearDown()
2234 def test_set_dn_invalid(self):
2239 self.assertRaises(TypeError, assign)
2242 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2243 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2244 self.assertEqual(x, y)
2245 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2246 self.assertNotEqual(x, y)
2249 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2250 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2252 def test_repr(self):
2253 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2254 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2256 def test_get_casefold_2(self):
2257 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2258 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2260 def test_validate(self):
2261 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2262 self.assertTrue(x.validate())
2264 def test_parent(self):
2265 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2266 self.assertEqual("bar=bloe", x.parent().__str__())
2268 def test_parent_nonexistent(self):
2269 x = ldb.Dn(self.ldb, "@BLA")
2270 self.assertEqual(None, x.parent())
2272 def test_is_valid(self):
2273 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2274 self.assertTrue(x.is_valid())
2275 x = ldb.Dn(self.ldb, "")
2276 self.assertTrue(x.is_valid())
2278 def test_is_special(self):
2279 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2280 self.assertFalse(x.is_special())
2281 x = ldb.Dn(self.ldb, "@FOOBAR")
2282 self.assertTrue(x.is_special())
2284 def test_check_special(self):
2285 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2286 self.assertFalse(x.check_special("FOOBAR"))
2287 x = ldb.Dn(self.ldb, "@FOOBAR")
2288 self.assertTrue(x.check_special("@FOOBAR"))
2291 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2292 self.assertEqual(2, len(x))
2293 x = ldb.Dn(self.ldb, "dc=foo21")
2294 self.assertEqual(1, len(x))
2296 def test_add_child(self):
2297 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2298 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2299 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2301 def test_add_base(self):
2302 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2303 base = ldb.Dn(self.ldb, "bla=bloe")
2304 self.assertTrue(x.add_base(base))
2305 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2307 def test_add_child_str(self):
2308 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2309 self.assertTrue(x.add_child("bla=bloe"))
2310 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2312 def test_add_base_str(self):
2313 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2315 self.assertTrue(x.add_base(base))
2316 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2319 x = ldb.Dn(self.ldb, "dc=foo24")
2320 y = ldb.Dn(self.ldb, "bar=bla")
2321 self.assertEqual("dc=foo24,bar=bla", str(x + y))
2323 def test_remove_base_components(self):
2324 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2325 x.remove_base_components(len(x) - 1)
2326 self.assertEqual("dc=foo24", str(x))
2328 def test_parse_ldif(self):
2329 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2331 self.assertEqual("foo=bar", str(msg[1].dn))
2332 self.assertTrue(isinstance(msg[1], ldb.Message))
2333 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2334 self.assertEqual("dn: foo=bar\n\n", ldif)
2336 def test_parse_ldif_more(self):
2337 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2339 self.assertEqual("foo=bar", str(msg[1].dn))
2341 self.assertEqual("bar=bar", str(msg[1].dn))
2343 def test_print_ldif(self):
2344 ldif = '''dn: dc=foo27
2348 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2349 self.msg["foo"] = [b"foo"]
2350 self.assertEqual(ldif,
2351 self.ldb.write_ldif(self.msg,
2352 ldb.CHANGETYPE_NONE))
2354 def test_print_ldif_binary(self):
2355 # this also confirms that ldb flags are set even without a URL)
2356 self.ldb = ldb.Ldb(flags=ldb.FLG_SHOW_BINARY)
2357 ldif = '''dn: dc=foo27
2362 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2363 self.msg["foo"] = ["f\nöö"]
2364 self.assertEqual(ldif,
2365 self.ldb.write_ldif(self.msg,
2366 ldb.CHANGETYPE_NONE))
2369 def test_print_ldif_no_base64_bad(self):
2370 ldif = '''dn: dc=foo27
2375 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2376 self.msg["foo"] = ["f\nöö"]
2377 self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2378 self.assertEqual(ldif,
2379 self.ldb.write_ldif(self.msg,
2380 ldb.CHANGETYPE_NONE))
2382 def test_print_ldif_no_base64_good(self):
2383 ldif = '''dn: dc=foo27
2387 self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2388 self.msg["foo"] = ["föö"]
2389 self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2390 self.assertEqual(ldif,
2391 self.ldb.write_ldif(self.msg,
2392 ldb.CHANGETYPE_NONE))
2394 def test_canonical_string(self):
2395 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2396 self.assertEqual("/bloe/foo25", x.canonical_str())
2398 def test_canonical_ex_string(self):
2399 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2400 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2402 def test_ldb_is_child_of(self):
2403 """Testing ldb_dn_compare_dn"""
2404 dn1 = ldb.Dn(self.ldb, "dc=base")
2405 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2406 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2407 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2409 self.assertTrue(dn1.is_child_of(dn1))
2410 self.assertTrue(dn2.is_child_of(dn1))
2411 self.assertTrue(dn4.is_child_of(dn1))
2412 self.assertTrue(dn4.is_child_of(dn3))
2413 self.assertTrue(dn4.is_child_of(dn4))
2414 self.assertFalse(dn3.is_child_of(dn2))
2415 self.assertFalse(dn1.is_child_of(dn4))
2417 def test_ldb_is_child_of_str(self):
2418 """Testing ldb_dn_compare_dn"""
2420 dn2_str = "cn=foo,dc=base"
2421 dn3_str = "cn=bar,dc=base"
2422 dn4_str = "cn=baz,cn=bar,dc=base"
2424 dn1 = ldb.Dn(self.ldb, dn1_str)
2425 dn2 = ldb.Dn(self.ldb, dn2_str)
2426 dn3 = ldb.Dn(self.ldb, dn3_str)
2427 dn4 = ldb.Dn(self.ldb, dn4_str)
2429 self.assertTrue(dn1.is_child_of(dn1_str))
2430 self.assertTrue(dn2.is_child_of(dn1_str))
2431 self.assertTrue(dn4.is_child_of(dn1_str))
2432 self.assertTrue(dn4.is_child_of(dn3_str))
2433 self.assertTrue(dn4.is_child_of(dn4_str))
2434 self.assertFalse(dn3.is_child_of(dn2_str))
2435 self.assertFalse(dn1.is_child_of(dn4_str))
2437 def test_get_component_name(self):
2438 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2439 self.assertEqual(dn.get_component_name(0), 'cn')
2440 self.assertEqual(dn.get_component_name(1), 'dc')
2441 self.assertEqual(dn.get_component_name(2), None)
2442 self.assertEqual(dn.get_component_name(-1), None)
2444 def test_get_component_value(self):
2445 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2446 self.assertEqual(dn.get_component_value(0), 'foo')
2447 self.assertEqual(dn.get_component_value(1), 'base')
2448 self.assertEqual(dn.get_component_name(2), None)
2449 self.assertEqual(dn.get_component_name(-1), None)
2451 def test_set_component(self):
2452 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2453 dn.set_component(0, 'cn', 'bar')
2454 self.assertEqual(str(dn), "cn=bar,dc=base")
2455 dn.set_component(1, 'o', 'asep')
2456 self.assertEqual(str(dn), "cn=bar,o=asep")
2457 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2458 self.assertEqual(str(dn), "cn=bar,o=asep")
2459 dn.set_component(1, 'o', 'a,b+c')
2460 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2462 def test_set_component_bytes(self):
2463 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2464 dn.set_component(0, 'cn', b'bar')
2465 self.assertEqual(str(dn), "cn=bar,dc=base")
2466 dn.set_component(1, 'o', b'asep')
2467 self.assertEqual(str(dn), "cn=bar,o=asep")
2469 def test_set_component_none(self):
2470 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2471 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2473 def test_get_extended_component_null(self):
2474 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2475 self.assertEqual(dn.get_extended_component("TEST"), None)
2477 def test_get_extended_component(self):
2478 self.ldb._register_test_extensions()
2479 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2480 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2482 def test_set_extended_component(self):
2483 self.ldb._register_test_extensions()
2484 dn = ldb.Dn(self.ldb, "dc=base")
2485 dn.set_extended_component("TEST", "foo")
2486 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2487 dn.set_extended_component("TEST", b"bar")
2488 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2490 def test_extended_str(self):
2491 self.ldb._register_test_extensions()
2492 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2493 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2495 def test_get_rdn_name(self):
2496 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2497 self.assertEqual(dn.get_rdn_name(), 'cn')
2499 def test_get_rdn_value(self):
2500 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2501 self.assertEqual(dn.get_rdn_value(), 'foo')
2503 def test_get_casefold(self):
2504 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2505 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2507 def test_get_linearized(self):
2508 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2509 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2511 def test_is_null(self):
2512 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2513 self.assertFalse(dn.is_null())
2515 dn = ldb.Dn(self.ldb, '')
2516 self.assertTrue(dn.is_null())
2519 class LdbMsgTests(TestCase):
2522 super(LdbMsgTests, self).setUp()
2523 self.msg = ldb.Message()
2525 def test_init_dn(self):
2526 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2527 self.assertEqual("dc=foo27", str(self.msg.dn))
2529 def test_iter_items(self):
2530 self.assertEqual(0, len(self.msg.items()))
2531 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2532 self.assertEqual(1, len(self.msg.items()))
2534 def test_repr(self):
2535 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2536 self.msg["dc"] = b"foo"
2538 self.assertIn(repr(self.msg), [
2539 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2540 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2542 self.assertIn(repr(self.msg.text), [
2543 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2544 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2547 self.assertIn(repr(self.msg), [
2548 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})",
2549 "Message({'dc': MessageElement(['foo']), 'dn': Dn('dc=foo29')})",
2551 self.assertIn(repr(self.msg.text), [
2552 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text",
2553 "Message({'dc': MessageElement(['foo']), 'dn': Dn('dc=foo29')}).text",
2557 self.assertEqual(0, len(self.msg))
2559 def test_notpresent(self):
2560 self.assertRaises(KeyError, lambda: self.msg["foo"])
2566 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2568 def test_add_text(self):
2569 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2571 def test_elements_empty(self):
2572 self.assertEqual([], self.msg.elements())
2574 def test_elements(self):
2575 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2577 self.assertEqual([el], self.msg.elements())
2578 self.assertEqual([el.text], self.msg.text.elements())
2580 def test_add_value(self):
2581 self.assertEqual(0, len(self.msg))
2582 self.msg["foo"] = [b"foo"]
2583 self.assertEqual(1, len(self.msg))
2585 def test_add_value_text(self):
2586 self.assertEqual(0, len(self.msg))
2587 self.msg["foo"] = ["foo"]
2588 self.assertEqual(1, len(self.msg))
2590 def test_add_value_multiple(self):
2591 self.assertEqual(0, len(self.msg))
2592 self.msg["foo"] = [b"foo", b"bla"]
2593 self.assertEqual(1, len(self.msg))
2594 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2596 def test_add_value_multiple_text(self):
2597 self.assertEqual(0, len(self.msg))
2598 self.msg["foo"] = ["foo", "bla"]
2599 self.assertEqual(1, len(self.msg))
2600 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2602 def test_set_value(self):
2603 self.msg["foo"] = [b"fool"]
2604 self.assertEqual([b"fool"], list(self.msg["foo"]))
2605 self.msg["foo"] = [b"bar"]
2606 self.assertEqual([b"bar"], list(self.msg["foo"]))
2608 def test_set_value_text(self):
2609 self.msg["foo"] = ["fool"]
2610 self.assertEqual(["fool"], list(self.msg.text["foo"]))
2611 self.msg["foo"] = ["bar"]
2612 self.assertEqual(["bar"], list(self.msg.text["foo"]))
2614 def test_keys(self):
2615 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2616 self.msg["foo"] = [b"bla"]
2617 self.msg["bar"] = [b"bla"]
2618 self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2620 def test_keys_text(self):
2621 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2622 self.msg["foo"] = ["bla"]
2623 self.msg["bar"] = ["bla"]
2624 self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2627 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2628 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2630 def test_get_dn(self):
2631 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2632 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2634 def test_dn_text(self):
2635 self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2636 self.assertEqual("@BASEINFO", str(self.msg.dn))
2637 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2639 def test_get_dn_text(self):
2640 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2641 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2642 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2644 def test_get_invalid(self):
2645 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2646 self.assertRaises(TypeError, self.msg.get, 42)
2648 def test_get_other(self):
2649 self.msg["foo"] = [b"bar"]
2650 self.assertEqual(b"bar", self.msg.get("foo")[0])
2651 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2652 self.assertEqual(None, self.msg.get("foo", idx=1))
2653 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2655 def test_get_other_text(self):
2656 self.msg["foo"] = ["bar"]
2657 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2658 self.assertEqual("bar", self.msg.text.get("foo")[0])
2659 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2660 self.assertEqual(None, self.msg.get("foo", idx=1))
2661 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2663 def test_get_default(self):
2664 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2665 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2667 def test_get_default_text(self):
2668 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2669 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2671 def test_get_unknown(self):
2672 self.assertEqual(None, self.msg.get("lalalala"))
2674 def test_get_unknown_text(self):
2675 self.assertEqual(None, self.msg.text.get("lalalala"))
2677 def test_msg_diff(self):
2679 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2680 msg1 = next(msgs)[1]
2681 msg2 = next(msgs)[1]
2682 msgdiff = l.msg_diff(msg1, msg2)
2683 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2684 self.assertRaises(KeyError, lambda: msgdiff["foo"])
2685 self.assertEqual(1, len(msgdiff))
2687 def test_equal_empty(self):
2688 msg1 = ldb.Message()
2689 msg2 = ldb.Message()
2690 self.assertEqual(msg1, msg2)
2692 def test_equal_simplel(self):
2694 msg1 = ldb.Message()
2695 msg1.dn = ldb.Dn(db, "foo=bar")
2696 msg2 = ldb.Message()
2697 msg2.dn = ldb.Dn(db, "foo=bar")
2698 self.assertEqual(msg1, msg2)
2699 msg1['foo'] = b'bar'
2700 msg2['foo'] = b'bar'
2701 self.assertEqual(msg1, msg2)
2702 msg2['foo'] = b'blie'
2703 self.assertNotEqual(msg1, msg2)
2704 msg2['foo'] = b'blie'
2706 def test_from_dict(self):
2707 rec = {"dn": "dc=fromdict",
2708 "a1": [b"a1-val1", b"a1-val1"]}
2710 # check different types of input Flags
2711 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2712 m = ldb.Message.from_dict(l, rec, flags)
2713 self.assertEqual(rec["a1"], list(m["a1"]))
2714 self.assertEqual(flags, m["a1"].flags())
2715 # check input params
2716 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2717 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2718 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2719 # Message.from_dict expects dictionary with 'dn'
2720 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2721 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2723 def test_from_dict_text(self):
2724 rec = {"dn": "dc=fromdict",
2725 "a1": ["a1-val1", "a1-val1"]}
2727 # check different types of input Flags
2728 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2729 m = ldb.Message.from_dict(l, rec, flags)
2730 self.assertEqual(rec["a1"], list(m.text["a1"]))
2731 self.assertEqual(flags, m.text["a1"].flags())
2732 # check input params
2733 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2734 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2735 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2736 # Message.from_dict expects dictionary with 'dn'
2737 err_rec = {"a1": ["a1-val1", "a1-val1"]}
2738 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2740 def test_copy_add_message_element(self):
2742 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2743 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2747 self.assertEqual(mto["1"], m["1"])
2748 self.assertEqual(mto["2"], m["2"])
2752 self.assertEqual(mto["1"], m["1"])
2753 self.assertEqual(mto["2"], m["2"])
2755 def test_copy_add_message_element_text(self):
2757 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2758 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2762 self.assertEqual(mto["1"], m.text["1"])
2763 self.assertEqual(mto["2"], m.text["2"])
2767 self.assertEqual(mto.text["1"], m.text["1"])
2768 self.assertEqual(mto.text["2"], m.text["2"])
2769 self.assertEqual(mto["1"], m["1"])
2770 self.assertEqual(mto["2"], m["2"])
2773 class MessageElementTests(TestCase):
2775 def test_cmp_element(self):
2776 x = ldb.MessageElement([b"foo"])
2777 y = ldb.MessageElement([b"foo"])
2778 z = ldb.MessageElement([b"bzr"])
2779 self.assertEqual(x, y)
2780 self.assertNotEqual(x, z)
2782 def test_cmp_element_text(self):
2783 x = ldb.MessageElement([b"foo"])
2784 y = ldb.MessageElement(["foo"])
2785 self.assertEqual(x, y)
2787 def test_create_iterable(self):
2788 x = ldb.MessageElement([b"foo"])
2789 self.assertEqual([b"foo"], list(x))
2790 self.assertEqual(["foo"], list(x.text))
2792 def test_repr(self):
2793 x = ldb.MessageElement([b"foo"])
2795 self.assertEqual("MessageElement([b'foo'])", repr(x))
2796 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2798 self.assertEqual("MessageElement(['foo'])", repr(x))
2799 self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2800 x = ldb.MessageElement([b"foo", b"bla"])
2801 self.assertEqual(2, len(x))
2803 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2804 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2806 self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2807 self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2809 def test_get_item(self):
2810 x = ldb.MessageElement([b"foo", b"bar"])
2811 self.assertEqual(b"foo", x[0])
2812 self.assertEqual(b"bar", x[1])
2813 self.assertEqual(b"bar", x[-1])
2814 self.assertRaises(IndexError, lambda: x[45])
2816 def test_get_item_text(self):
2817 x = ldb.MessageElement(["foo", "bar"])
2818 self.assertEqual("foo", x.text[0])
2819 self.assertEqual("bar", x.text[1])
2820 self.assertEqual("bar", x.text[-1])
2821 self.assertRaises(IndexError, lambda: x[45])
2824 x = ldb.MessageElement([b"foo", b"bar"])
2825 self.assertEqual(2, len(x))
2828 x = ldb.MessageElement([b"foo", b"bar"])
2829 y = ldb.MessageElement([b"foo", b"bar"])
2830 self.assertEqual(y, x)
2831 x = ldb.MessageElement([b"foo"])
2832 self.assertNotEqual(y, x)
2833 y = ldb.MessageElement([b"foo"])
2834 self.assertEqual(y, x)
2836 def test_extended(self):
2837 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2839 self.assertEqual("MessageElement([b'456'])", repr(el))
2840 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2842 self.assertEqual("MessageElement(['456'])", repr(el))
2843 self.assertEqual("MessageElement(['456']).text", repr(el.text))
2845 def test_bad_text(self):
2846 el = ldb.MessageElement(b'\xba\xdd')
2847 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2850 class ModuleTests(TestCase):
2853 super(ModuleTests, self).setUp()
2854 self.testdir = tempdir()
2855 self.filename = os.path.join(self.testdir, "test.ldb")
2856 self.ldb = ldb.Ldb(self.filename)
2859 shutil.rmtree(self.testdir)
2860 super(ModuleTests, self).setUp()
2862 def test_register_module(self):
2863 class ExampleModule:
2865 ldb.register_module(ExampleModule)
2867 def test_use_module(self):
2870 class ExampleModule:
2873 def __init__(self, ldb, next):
2877 def search(self, *args, **kwargs):
2878 return self.next.search(*args, **kwargs)
2880 def request(self, *args, **kwargs):
2883 ldb.register_module(ExampleModule)
2884 l = ldb.Ldb(self.filename)
2885 l.add({"dn": "@MODULES", "@LIST": "bla"})
2886 self.assertEqual([], ops)
2887 l = ldb.Ldb(self.filename)
2888 self.assertEqual(["init"], ops)
2891 class LdbResultTests(LdbBaseTest):
2894 super(LdbResultTests, self).setUp()
2895 self.testdir = tempdir()
2896 self.filename = os.path.join(self.testdir, "test.ldb")
2897 self.l = ldb.Ldb(self.url(), flags=self.flags())
2899 self.l.add(self.index)
2900 except AttributeError:
2902 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2903 "objectUUID": b"0123456789abcde0"})
2904 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2905 "objectUUID": b"0123456789abcde1"})
2906 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2907 "objectUUID": b"0123456789abcde2"})
2908 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2909 "objectUUID": b"0123456789abcde3"})
2910 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2911 "objectUUID": b"0123456789abcde4"})
2912 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2913 "objectUUID": b"0123456789abcde5"})
2914 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2915 "objectUUID": b"0123456789abcde6"})
2916 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2917 "objectUUID": b"0123456789abcde7"})
2918 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2919 "objectUUID": b"0123456789abcde8"})
2920 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2921 "objectUUID": b"0123456789abcde9"})
2922 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2923 "objectUUID": b"0123456789abcdea"})
2924 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2925 "objectUUID": b"0123456789abcdeb"})
2926 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2927 "objectUUID": b"0123456789abcdec"})
2930 shutil.rmtree(self.testdir)
2931 super(LdbResultTests, self).tearDown()
2932 # Ensure the LDB is closed now, so we close the FD
2935 def test_return_type(self):
2936 res = self.l.search()
2937 self.assertEqual(str(res), "<ldb result>")
2939 def test_get_msgs(self):
2940 res = self.l.search()
2943 def test_get_controls(self):
2944 res = self.l.search()
2947 def test_get_referals(self):
2948 res = self.l.search()
2951 def test_iter_msgs(self):
2953 for l in self.l.search().msgs:
2954 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2956 self.assertTrue(found)
2958 def test_iter_msgs_count(self):
2959 self.assertTrue(self.l.search().count > 0)
2960 # 13 objects has been added to the DC=SAMBA, DC=ORG
2961 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2963 def test_iter_controls(self):
2964 res = self.l.search().controls
2967 def test_create_control(self):
2968 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2969 c = ldb.Control(self.l, "relax:1")
2970 self.assertEqual(c.critical, True)
2971 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2973 def test_iter_refs(self):
2974 res = self.l.search().referals
2977 def test_search_sequence_msgs(self):
2979 res = self.l.search().msgs
2981 for i in range(0, len(res)):
2983 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2985 self.assertTrue(found)
2987 def test_search_as_iter(self):
2989 res = self.l.search()
2992 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2994 self.assertTrue(found)
2996 def test_search_iter(self):
2998 res = self.l.search_iterator()
3001 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3003 self.assertTrue(found)
3005 # Show that search results can't see into a transaction
3007 def test_search_against_trans(self):
3010 (r1, w1) = os.pipe()
3012 (r2, w2) = os.pipe()
3014 # For the first element, fork a child that will
3018 # In the child, re-open
3022 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3023 # start a transaction
3024 child_ldb.transaction_start()
3027 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3028 "name": b"samba.org",
3029 "objectUUID": b"o123456789acbdef"})
3031 os.write(w1, b"added")
3033 # Now wait for the search to be done
3038 child_ldb.transaction_commit()
3039 except ldb.LdbError as err:
3040 # We print this here to see what went wrong in the child
3044 os.write(w1, b"transaction")
3047 self.assertEqual(os.read(r1, 5), b"added")
3049 # This should not turn up until the transaction is concluded
3050 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3051 scope=ldb.SCOPE_BASE)
3052 self.assertEqual(len(res11), 0)
3054 os.write(w2, b"search")
3056 # Now wait for the transaction to be done. This should
3057 # deadlock, but the search doesn't hold a read lock for the
3058 # iterator lifetime currently.
3059 self.assertEqual(os.read(r1, 11), b"transaction")
3061 # This should now turn up, as the transaction is over
3062 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3063 scope=ldb.SCOPE_BASE)
3064 self.assertEqual(len(res11), 1)
3066 self.assertFalse(found11)
3068 (got_pid, status) = os.waitpid(pid, 0)
3069 self.assertEqual(got_pid, pid)
3071 def test_search_iter_against_trans(self):
3075 # We need to hold this iterator open to hold the all-record
3077 res = self.l.search_iterator()
3079 (r1, w1) = os.pipe()
3081 (r2, w2) = os.pipe()
3083 # For the first element, with the sequence open (which
3084 # means with ldb locks held), fork a child that will
3088 # In the child, re-open
3093 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3094 # start a transaction
3095 child_ldb.transaction_start()
3098 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3099 "name": b"samba.org",
3100 "objectUUID": b"o123456789acbdef"})
3102 os.write(w1, b"added")
3104 # Now wait for the search to be done
3109 child_ldb.transaction_commit()
3110 except ldb.LdbError as err:
3111 # We print this here to see what went wrong in the child
3115 os.write(w1, b"transaction")
3118 self.assertEqual(os.read(r1, 5), b"added")
3120 # This should not turn up until the transaction is concluded
3121 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3122 scope=ldb.SCOPE_BASE)
3123 self.assertEqual(len(res11), 0)
3125 os.write(w2, b"search")
3127 # allow the transaction to start
3130 # This should not turn up until the search finishes and
3131 # removed the read lock, but for ldb_tdb that happened as soon
3132 # as we called the first res.next()
3133 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3134 scope=ldb.SCOPE_BASE)
3135 self.assertEqual(len(res11), 0)
3137 # These results are all collected at the first next(res) call
3139 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3141 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
3144 # Now wait for the transaction to be done.
3145 self.assertEqual(os.read(r1, 11), b"transaction")
3147 # This should now turn up, as the transaction is over and all
3148 # read locks are gone
3149 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3150 scope=ldb.SCOPE_BASE)
3151 self.assertEqual(len(res11), 1)
3153 self.assertTrue(found)
3154 self.assertFalse(found11)
3156 (got_pid, status) = os.waitpid(pid, 0)
3157 self.assertEqual(got_pid, pid)
3160 class LdbResultTestsLmdb(LdbResultTests):
3163 if os.environ.get('HAVE_LMDB', '1') == '0':
3164 self.skipTest("No lmdb backend")
3165 self.prefix = MDB_PREFIX
3166 self.index = MDB_INDEX_OBJ
3167 super(LdbResultTestsLmdb, self).setUp()
3170 super(LdbResultTestsLmdb, self).tearDown()
3173 class BadTypeTests(TestCase):
3174 def test_control(self):
3176 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
3177 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
3179 def test_modify(self):
3181 dn = ldb.Dn(l, 'a=b')
3183 self.assertRaises(TypeError, l.modify, '<bad type>')
3184 self.assertRaises(TypeError, l.modify, m, '<bad type>')
3188 dn = ldb.Dn(l, 'a=b')
3190 self.assertRaises(TypeError, l.add, '<bad type>')
3191 self.assertRaises(TypeError, l.add, m, '<bad type>')
3193 def test_delete(self):
3195 dn = ldb.Dn(l, 'a=b')
3196 self.assertRaises(TypeError, l.add, '<bad type>')
3197 self.assertRaises(TypeError, l.add, dn, '<bad type>')
3199 def test_rename(self):
3201 dn = ldb.Dn(l, 'a=b')
3202 self.assertRaises(TypeError, l.add, '<bad type>', dn)
3203 self.assertRaises(TypeError, l.add, dn, '<bad type>')
3204 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
3206 def test_search(self):
3208 self.assertRaises(TypeError, l.search, base=1234)
3209 self.assertRaises(TypeError, l.search, scope='<bad type>')
3210 self.assertRaises(TypeError, l.search, expression=1234)
3211 self.assertRaises(TypeError, l.search, attrs='<bad type>')
3212 self.assertRaises(TypeError, l.search, controls='<bad type>')
3215 class VersionTests(TestCase):
3217 def test_version(self):
3218 self.assertTrue(isinstance(ldb.__version__, str))
3220 class NestedTransactionTests(LdbBaseTest):
3222 super(NestedTransactionTests, self).setUp()
3223 self.testdir = tempdir()
3224 self.filename = os.path.join(self.testdir, "test.ldb")
3225 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
3226 self.ldb.add({"dn": "@INDEXLIST",
3227 "@IDXATTR": [b"x", b"y", b"ou"],
3228 "@IDXGUID": [b"objectUUID"],
3229 "@IDX_DN_GUID": [b"GUID"]})
3231 super(NestedTransactionTests, self).setUp()
3234 # This test documents that currently ldb does not support true nested
3237 # Note: The test is written so that it treats failure as pass.
3238 # It is done this way as standalone ldb builds do not use the samba
3239 # known fail mechanism
3241 def test_nested_transactions(self):
3243 self.ldb.transaction_start()
3245 self.ldb.add({"dn": "x=x1,dc=samba,dc=org",
3246 "objectUUID": b"0123456789abcde1"})
3247 res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3248 base="dc=samba,dc=org")
3249 self.assertEqual(len(res), 1)
3251 self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
3252 "objectUUID": b"0123456789abcde2"})
3253 res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3254 base="dc=samba,dc=org")
3255 self.assertEqual(len(res), 1)
3257 self.ldb.transaction_start()
3258 self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
3259 "objectUUID": b"0123456789abcde3"})
3260 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3261 base="dc=samba,dc=org")
3262 self.assertEqual(len(res), 1)
3263 self.ldb.transaction_cancel()
3265 # Check that we can not see the record added by the cancelled
3267 # Currently this fails as ldb does not support true nested
3268 # transactions, and only the outer commits and cancels have an effect
3270 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3271 base="dc=samba,dc=org")
3273 # FIXME this test currently passes on a failure, i.e. if nested
3274 # transaction support worked correctly the correct test would
3276 # self.assertEqual(len(res), 0)
3277 # as the add of objectUUID=0123456789abcde3 would reverted when
3278 # the sub transaction it was nested in was rolled back.
3280 # Currently this is not the case so the record is still present.
3281 self.assertEqual(len(res), 1)
3284 # Commit the outer transaction
3286 self.ldb.transaction_commit()
3288 # Now check we can still see the records added in the outer
3291 res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3292 base="dc=samba,dc=org")
3293 self.assertEqual(len(res), 1)
3294 res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3295 base="dc=samba,dc=org")
3296 self.assertEqual(len(res), 1)
3298 # And that we can't see the records added by the nested transaction.
3300 res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3301 base="dc=samba,dc=org")
3302 # FIXME again if nested transactions worked correctly we would not
3303 # see this record. The test should be.
3304 # self.assertEqual(len(res), 0)
3305 self.assertEqual(len(res), 1)
3308 super(NestedTransactionTests, self).tearDown()
3311 class LmdbNestedTransactionTests(NestedTransactionTests):
3314 if os.environ.get('HAVE_LMDB', '1') == '0':
3315 self.skipTest("No lmdb backend")
3316 self.prefix = MDB_PREFIX
3317 self.index = MDB_INDEX_OBJ
3318 super(LmdbNestedTransactionTests, self).setUp()
3321 super(LmdbNestedTransactionTests, self).tearDown()
3324 if __name__ == '__main__':
3326 unittest.TestProgram()