2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
6 from unittest import TestCase
13 PY3 = sys.version_info > (3, 0)
19 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
22 return tempfile.mkdtemp(dir=dir_prefix)
25 class NoContextTests(TestCase):
27 def test_valid_attr_name(self):
28 self.assertTrue(ldb.valid_attr_name("foo"))
29 self.assertFalse(ldb.valid_attr_name("24foo"))
31 def test_timestring(self):
32 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
33 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
35 def test_string_to_time(self):
36 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
37 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
39 def test_binary_encode(self):
40 encoded = ldb.binary_encode(b'test\\x')
41 decoded = ldb.binary_decode(encoded)
42 self.assertEqual(decoded, b'test\\x')
44 encoded2 = ldb.binary_encode('test\\x')
45 self.assertEqual(encoded2, encoded)
47 class SimpleLdb(TestCase):
50 super(SimpleLdb, self).setUp()
51 self.testdir = tempdir()
52 self.filename = os.path.join(self.testdir, "test.ldb")
53 self.ldb = ldb.Ldb(self.filename)
56 shutil.rmtree(self.testdir)
57 super(SimpleLdb, self).tearDown()
59 def test_connect(self):
60 ldb.Ldb(self.filename)
62 def test_connect_none(self):
65 def test_connect_later(self):
67 x.connect(self.filename)
71 self.assertTrue(repr(x).startswith("<ldb connection"))
73 def test_set_create_perms(self):
75 x.set_create_perms(0o600)
77 def test_modules_none(self):
79 self.assertEqual([], x.modules())
81 def test_modules_tdb(self):
82 x = ldb.Ldb(self.filename)
83 self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
85 def test_firstmodule_none(self):
87 self.assertEqual(x.firstmodule, None)
89 def test_firstmodule_tdb(self):
90 x = ldb.Ldb(self.filename)
92 self.assertEqual(repr(mod), "<ldb module 'tdb'>")
94 def test_search(self):
95 l = ldb.Ldb(self.filename)
96 self.assertEqual(len(l.search()), 0)
98 def test_search_controls(self):
99 l = ldb.Ldb(self.filename)
100 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
102 def test_search_attrs(self):
103 l = ldb.Ldb(self.filename)
104 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
106 def test_search_string_dn(self):
107 l = ldb.Ldb(self.filename)
108 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
110 def test_search_attr_string(self):
111 l = ldb.Ldb(self.filename)
112 self.assertRaises(TypeError, l.search, attrs="dc")
113 self.assertRaises(TypeError, l.search, attrs=b"dc")
115 def test_opaque(self):
116 l = ldb.Ldb(self.filename)
117 l.set_opaque("my_opaque", l)
118 self.assertTrue(l.get_opaque("my_opaque") is not None)
119 self.assertEqual(None, l.get_opaque("unknown"))
121 def test_search_scope_base_empty_db(self):
122 l = ldb.Ldb(self.filename)
123 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
126 def test_search_scope_onelevel_empty_db(self):
127 l = ldb.Ldb(self.filename)
128 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
129 ldb.SCOPE_ONELEVEL)), 0)
131 def test_delete(self):
132 l = ldb.Ldb(self.filename)
133 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
135 def test_delete_w_unhandled_ctrl(self):
136 l = ldb.Ldb(self.filename)
138 m.dn = ldb.Dn(l, "dc=foo1")
141 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
144 def test_contains(self):
147 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
150 m.dn = ldb.Dn(l, "dc=foo3")
154 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
155 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
159 def test_get_config_basedn(self):
160 l = ldb.Ldb(self.filename)
161 self.assertEqual(None, l.get_config_basedn())
163 def test_get_root_basedn(self):
164 l = ldb.Ldb(self.filename)
165 self.assertEqual(None, l.get_root_basedn())
167 def test_get_schema_basedn(self):
168 l = ldb.Ldb(self.filename)
169 self.assertEqual(None, l.get_schema_basedn())
171 def test_get_default_basedn(self):
172 l = ldb.Ldb(self.filename)
173 self.assertEqual(None, l.get_default_basedn())
176 l = ldb.Ldb(self.filename)
178 m.dn = ldb.Dn(l, "dc=foo4")
180 self.assertEqual(len(l.search()), 0)
183 self.assertEqual(len(l.search()), 1)
185 l.delete(ldb.Dn(l, "dc=foo4"))
187 def test_search_iterator(self):
188 l = ldb.Ldb(self.filename)
189 s = l.search_iterator()
195 except RuntimeError as re:
200 except RuntimeError as re:
205 except RuntimeError as re:
208 s = l.search_iterator()
211 self.assertTrue(isinstance(me, ldb.Message))
214 self.assertEqual(len(r), 0)
215 self.assertEqual(count, 0)
218 m1.dn = ldb.Dn(l, "dc=foo4")
222 s = l.search_iterator()
225 self.assertTrue(isinstance(me, ldb.Message))
229 self.assertEqual(len(r), 0)
230 self.assertEqual(len(msgs), 1)
231 self.assertEqual(msgs[0].dn, m1.dn)
234 m2.dn = ldb.Dn(l, "dc=foo5")
238 s = l.search_iterator()
241 self.assertTrue(isinstance(me, ldb.Message))
245 self.assertEqual(len(r), 0)
246 self.assertEqual(len(msgs), 2)
247 if msgs[0].dn == m1.dn:
248 self.assertEqual(msgs[0].dn, m1.dn)
249 self.assertEqual(msgs[1].dn, m2.dn)
251 self.assertEqual(msgs[0].dn, m2.dn)
252 self.assertEqual(msgs[1].dn, m1.dn)
254 s = l.search_iterator()
257 self.assertTrue(isinstance(me, ldb.Message))
264 except RuntimeError as re:
267 self.assertTrue(isinstance(me, ldb.Message))
275 self.assertEqual(len(r), 0)
276 self.assertEqual(len(msgs), 2)
277 if msgs[0].dn == m1.dn:
278 self.assertEqual(msgs[0].dn, m1.dn)
279 self.assertEqual(msgs[1].dn, m2.dn)
281 self.assertEqual(msgs[0].dn, m2.dn)
282 self.assertEqual(msgs[1].dn, m1.dn)
284 l.delete(ldb.Dn(l, "dc=foo4"))
285 l.delete(ldb.Dn(l, "dc=foo5"))
287 def test_add_text(self):
288 l = ldb.Ldb(self.filename)
290 m.dn = ldb.Dn(l, "dc=foo4")
292 self.assertEqual(len(l.search()), 0)
295 self.assertEqual(len(l.search()), 1)
297 l.delete(ldb.Dn(l, "dc=foo4"))
299 def test_add_w_unhandled_ctrl(self):
300 l = ldb.Ldb(self.filename)
302 m.dn = ldb.Dn(l, "dc=foo4")
304 self.assertEqual(len(l.search()), 0)
305 self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
307 def test_add_dict(self):
308 l = ldb.Ldb(self.filename)
309 m = {"dn": ldb.Dn(l, "dc=foo5"),
311 self.assertEqual(len(l.search()), 0)
314 self.assertEqual(len(l.search()), 1)
316 l.delete(ldb.Dn(l, "dc=foo5"))
318 def test_add_dict_text(self):
319 l = ldb.Ldb(self.filename)
320 m = {"dn": ldb.Dn(l, "dc=foo5"),
322 self.assertEqual(len(l.search()), 0)
325 self.assertEqual(len(l.search()), 1)
327 l.delete(ldb.Dn(l, "dc=foo5"))
329 def test_add_dict_string_dn(self):
330 l = ldb.Ldb(self.filename)
331 m = {"dn": "dc=foo6", "bla": b"bla"}
332 self.assertEqual(len(l.search()), 0)
335 self.assertEqual(len(l.search()), 1)
337 l.delete(ldb.Dn(l, "dc=foo6"))
339 def test_add_dict_bytes_dn(self):
340 l = ldb.Ldb(self.filename)
341 m = {"dn": b"dc=foo6", "bla": b"bla"}
342 self.assertEqual(len(l.search()), 0)
345 self.assertEqual(len(l.search()), 1)
347 l.delete(ldb.Dn(l, "dc=foo6"))
349 def test_rename(self):
350 l = ldb.Ldb(self.filename)
352 m.dn = ldb.Dn(l, "dc=foo7")
354 self.assertEqual(len(l.search()), 0)
357 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
358 self.assertEqual(len(l.search()), 1)
360 l.delete(ldb.Dn(l, "dc=bar"))
362 def test_rename_string_dns(self):
363 l = ldb.Ldb(self.filename)
365 m.dn = ldb.Dn(l, "dc=foo8")
367 self.assertEqual(len(l.search()), 0)
369 self.assertEqual(len(l.search()), 1)
371 l.rename("dc=foo8", "dc=bar")
372 self.assertEqual(len(l.search()), 1)
374 l.delete(ldb.Dn(l, "dc=bar"))
376 def test_empty_dn(self):
377 l = ldb.Ldb(self.filename)
378 self.assertEqual(0, len(l.search()))
380 m.dn = ldb.Dn(l, "dc=empty")
383 self.assertEqual(1, len(rm))
384 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
387 self.assertEqual(1, len(rm))
388 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
389 rm = l.search(m.dn, attrs=["blah"])
390 self.assertEqual(1, len(rm))
391 self.assertEqual(0, len(rm[0]))
393 def test_modify_delete(self):
394 l = ldb.Ldb(self.filename)
396 m.dn = ldb.Dn(l, "dc=modifydelete")
399 rm = l.search(m.dn)[0]
400 self.assertEqual([b"1234"], list(rm["bla"]))
403 m.dn = ldb.Dn(l, "dc=modifydelete")
404 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
405 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
408 self.assertEqual(1, len(rm))
409 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
410 rm = l.search(m.dn, attrs=["bla"])
411 self.assertEqual(1, len(rm))
412 self.assertEqual(0, len(rm[0]))
414 l.delete(ldb.Dn(l, "dc=modifydelete"))
416 def test_modify_delete_text(self):
417 l = ldb.Ldb(self.filename)
419 m.dn = ldb.Dn(l, "dc=modifydelete")
420 m.text["bla"] = ["1234"]
422 rm = l.search(m.dn)[0]
423 self.assertEqual(["1234"], list(rm.text["bla"]))
426 m.dn = ldb.Dn(l, "dc=modifydelete")
427 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
428 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
431 self.assertEqual(1, len(rm))
432 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
433 rm = l.search(m.dn, attrs=["bla"])
434 self.assertEqual(1, len(rm))
435 self.assertEqual(0, len(rm[0]))
437 l.delete(ldb.Dn(l, "dc=modifydelete"))
439 def test_modify_add(self):
440 l = ldb.Ldb(self.filename)
442 m.dn = ldb.Dn(l, "dc=add")
447 m.dn = ldb.Dn(l, "dc=add")
448 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
449 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
451 rm = l.search(m.dn)[0]
452 self.assertEqual(2, len(rm))
453 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
455 l.delete(ldb.Dn(l, "dc=add"))
457 def test_modify_add_text(self):
458 l = ldb.Ldb(self.filename)
460 m.dn = ldb.Dn(l, "dc=add")
461 m.text["bla"] = ["1234"]
465 m.dn = ldb.Dn(l, "dc=add")
466 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
467 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
469 rm = l.search(m.dn)[0]
470 self.assertEqual(2, len(rm))
471 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
473 l.delete(ldb.Dn(l, "dc=add"))
475 def test_modify_replace(self):
476 l = ldb.Ldb(self.filename)
478 m.dn = ldb.Dn(l, "dc=modify2")
479 m["bla"] = [b"1234", b"456"]
483 m.dn = ldb.Dn(l, "dc=modify2")
484 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
485 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
487 rm = l.search(m.dn)[0]
488 self.assertEqual(2, len(rm))
489 self.assertEqual([b"789"], list(rm["bla"]))
490 rm = l.search(m.dn, attrs=["bla"])[0]
491 self.assertEqual(1, len(rm))
493 l.delete(ldb.Dn(l, "dc=modify2"))
495 def test_modify_replace_text(self):
496 l = ldb.Ldb(self.filename)
498 m.dn = ldb.Dn(l, "dc=modify2")
499 m.text["bla"] = ["1234", "456"]
503 m.dn = ldb.Dn(l, "dc=modify2")
504 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
505 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
507 rm = l.search(m.dn)[0]
508 self.assertEqual(2, len(rm))
509 self.assertEqual(["789"], list(rm.text["bla"]))
510 rm = l.search(m.dn, attrs=["bla"])[0]
511 self.assertEqual(1, len(rm))
513 l.delete(ldb.Dn(l, "dc=modify2"))
515 def test_modify_flags_change(self):
516 l = ldb.Ldb(self.filename)
518 m.dn = ldb.Dn(l, "dc=add")
523 m.dn = ldb.Dn(l, "dc=add")
524 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
525 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
527 rm = l.search(m.dn)[0]
528 self.assertEqual(2, len(rm))
529 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
531 # Now create another modify, but switch the flags before we do it
532 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
533 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
535 rm = l.search(m.dn, attrs=["bla"])[0]
536 self.assertEqual(1, len(rm))
537 self.assertEqual([b"1234"], list(rm["bla"]))
539 l.delete(ldb.Dn(l, "dc=add"))
541 def test_modify_flags_change_text(self):
542 l = ldb.Ldb(self.filename)
544 m.dn = ldb.Dn(l, "dc=add")
545 m.text["bla"] = ["1234"]
549 m.dn = ldb.Dn(l, "dc=add")
550 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
551 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
553 rm = l.search(m.dn)[0]
554 self.assertEqual(2, len(rm))
555 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
557 # Now create another modify, but switch the flags before we do it
558 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
559 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
561 rm = l.search(m.dn, attrs=["bla"])[0]
562 self.assertEqual(1, len(rm))
563 self.assertEqual(["1234"], list(rm.text["bla"]))
565 l.delete(ldb.Dn(l, "dc=add"))
567 def test_transaction_commit(self):
568 l = ldb.Ldb(self.filename)
569 l.transaction_start()
570 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
573 l.transaction_commit()
576 def test_transaction_cancel(self):
577 l = ldb.Ldb(self.filename)
578 l.transaction_start()
579 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
582 l.transaction_cancel()
583 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
585 def test_set_debug(self):
586 def my_report_fn(level, text):
588 l = ldb.Ldb(self.filename)
589 l.set_debug(my_report_fn)
591 def test_zero_byte_string(self):
592 """Testing we do not get trapped in the \0 byte in a property string."""
593 l = ldb.Ldb(self.filename)
596 "objectclass" : b"user",
597 "cN" : b"LDAPtestUSER",
598 "givenname" : b"ldap",
599 "displayname" : b"foo\0bar",
601 res = l.search(expression="(dn=dc=somedn)")
602 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
604 def test_no_crash_broken_expr(self):
605 l = ldb.Ldb(self.filename)
606 self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
609 class DnTests(TestCase):
612 super(DnTests, self).setUp()
613 self.testdir = tempdir()
614 self.filename = os.path.join(self.testdir, "test.ldb")
615 self.ldb = ldb.Ldb(self.filename)
618 shutil.rmtree(self.testdir)
619 super(DnTests, self).tearDown()
621 def test_set_dn_invalid(self):
625 self.assertRaises(TypeError, assign)
628 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
629 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
630 self.assertEqual(x, y)
631 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
632 self.assertNotEqual(x, y)
635 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
636 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
639 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
640 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
642 def test_get_casefold(self):
643 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
644 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
646 def test_validate(self):
647 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
648 self.assertTrue(x.validate())
650 def test_parent(self):
651 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
652 self.assertEqual("bar=bloe", x.parent().__str__())
654 def test_parent_nonexistent(self):
655 x = ldb.Dn(self.ldb, "@BLA")
656 self.assertEqual(None, x.parent())
658 def test_is_valid(self):
659 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
660 self.assertTrue(x.is_valid())
661 x = ldb.Dn(self.ldb, "")
662 self.assertTrue(x.is_valid())
664 def test_is_special(self):
665 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
666 self.assertFalse(x.is_special())
667 x = ldb.Dn(self.ldb, "@FOOBAR")
668 self.assertTrue(x.is_special())
670 def test_check_special(self):
671 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
672 self.assertFalse(x.check_special("FOOBAR"))
673 x = ldb.Dn(self.ldb, "@FOOBAR")
674 self.assertTrue(x.check_special("@FOOBAR"))
677 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
678 self.assertEqual(2, len(x))
679 x = ldb.Dn(self.ldb, "dc=foo21")
680 self.assertEqual(1, len(x))
682 def test_add_child(self):
683 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
684 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
685 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
687 def test_add_base(self):
688 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
689 base = ldb.Dn(self.ldb, "bla=bloe")
690 self.assertTrue(x.add_base(base))
691 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
693 def test_add_child_str(self):
694 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
695 self.assertTrue(x.add_child("bla=bloe"))
696 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
698 def test_add_base_str(self):
699 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
701 self.assertTrue(x.add_base(base))
702 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
705 x = ldb.Dn(self.ldb, "dc=foo24")
706 y = ldb.Dn(self.ldb, "bar=bla")
707 self.assertEqual("dc=foo24,bar=bla", str(x + y))
709 def test_remove_base_components(self):
710 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
711 x.remove_base_components(len(x)-1)
712 self.assertEqual("dc=foo24", str(x))
714 def test_parse_ldif(self):
715 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
717 self.assertEqual("foo=bar", str(msg[1].dn))
718 self.assertTrue(isinstance(msg[1], ldb.Message))
719 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
720 self.assertEqual("dn: foo=bar\n\n", ldif)
722 def test_parse_ldif_more(self):
723 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
725 self.assertEqual("foo=bar", str(msg[1].dn))
727 self.assertEqual("bar=bar", str(msg[1].dn))
729 def test_canonical_string(self):
730 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
731 self.assertEqual("/bloe/foo25", x.canonical_str())
733 def test_canonical_ex_string(self):
734 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
735 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
737 def test_ldb_is_child_of(self):
738 """Testing ldb_dn_compare_dn"""
739 dn1 = ldb.Dn(self.ldb, "dc=base")
740 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
741 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
742 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
744 self.assertTrue(dn2.is_child_of(dn1))
745 self.assertTrue(dn4.is_child_of(dn1))
746 self.assertTrue(dn4.is_child_of(dn3))
747 self.assertFalse(dn3.is_child_of(dn2))
748 self.assertFalse(dn1.is_child_of(dn4))
750 def test_ldb_is_child_of_str(self):
751 """Testing ldb_dn_compare_dn"""
753 dn2_str = "cn=foo,dc=base"
754 dn3_str = "cn=bar,dc=base"
755 dn4_str = "cn=baz,cn=bar,dc=base"
757 dn1 = ldb.Dn(self.ldb, dn1_str)
758 dn2 = ldb.Dn(self.ldb, dn2_str)
759 dn3 = ldb.Dn(self.ldb, dn3_str)
760 dn4 = ldb.Dn(self.ldb, dn4_str)
762 self.assertTrue(dn2.is_child_of(dn1_str))
763 self.assertTrue(dn4.is_child_of(dn1_str))
764 self.assertTrue(dn4.is_child_of(dn3_str))
765 self.assertFalse(dn3.is_child_of(dn2_str))
766 self.assertFalse(dn1.is_child_of(dn4_str))
768 def test_get_component_name(self):
769 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
770 self.assertEqual(dn.get_component_name(0), 'cn')
771 self.assertEqual(dn.get_component_name(1), 'dc')
772 self.assertEqual(dn.get_component_name(2), None)
773 self.assertEqual(dn.get_component_name(-1), None)
775 def test_get_component_value(self):
776 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
777 self.assertEqual(dn.get_component_value(0), 'foo')
778 self.assertEqual(dn.get_component_value(1), 'base')
779 self.assertEqual(dn.get_component_name(2), None)
780 self.assertEqual(dn.get_component_name(-1), None)
782 def test_set_component(self):
783 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
784 dn.set_component(0, 'cn', 'bar')
785 self.assertEqual(str(dn), "cn=bar,dc=base")
786 dn.set_component(1, 'o', 'asep')
787 self.assertEqual(str(dn), "cn=bar,o=asep")
788 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
789 self.assertEqual(str(dn), "cn=bar,o=asep")
790 dn.set_component(1, 'o', 'a,b+c')
791 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
793 def test_set_component_bytes(self):
794 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
795 dn.set_component(0, 'cn', b'bar')
796 self.assertEqual(str(dn), "cn=bar,dc=base")
797 dn.set_component(1, 'o', b'asep')
798 self.assertEqual(str(dn), "cn=bar,o=asep")
800 def test_set_component_none(self):
801 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
802 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
804 def test_get_extended_component_null(self):
805 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
806 self.assertEqual(dn.get_extended_component("TEST"), None)
808 def test_get_extended_component(self):
809 self.ldb._register_test_extensions()
810 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
811 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
813 def test_set_extended_component(self):
814 self.ldb._register_test_extensions()
815 dn = ldb.Dn(self.ldb, "dc=base")
816 dn.set_extended_component("TEST", "foo")
817 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
818 dn.set_extended_component("TEST", b"bar")
819 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
821 def test_extended_str(self):
822 self.ldb._register_test_extensions()
823 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
824 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
826 def test_get_rdn_name(self):
827 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
828 self.assertEqual(dn.get_rdn_name(), 'cn')
830 def test_get_rdn_value(self):
831 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
832 self.assertEqual(dn.get_rdn_value(), 'foo')
834 def test_get_casefold(self):
835 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
836 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
838 def test_get_linearized(self):
839 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
840 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
842 def test_is_null(self):
843 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
844 self.assertFalse(dn.is_null())
846 dn = ldb.Dn(self.ldb, '')
847 self.assertTrue(dn.is_null())
849 class LdbMsgTests(TestCase):
852 super(LdbMsgTests, self).setUp()
853 self.msg = ldb.Message()
854 self.testdir = tempdir()
855 self.filename = os.path.join(self.testdir, "test.ldb")
858 shutil.rmtree(self.testdir)
859 super(LdbMsgTests, self).tearDown()
861 def test_init_dn(self):
862 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
863 self.assertEqual("dc=foo27", str(self.msg.dn))
865 def test_iter_items(self):
866 self.assertEqual(0, len(self.msg.items()))
867 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo28")
868 self.assertEqual(1, len(self.msg.items()))
871 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo29")
872 self.msg["dc"] = b"foo"
874 self.assertIn(repr(self.msg), [
875 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
876 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
878 self.assertIn(repr(self.msg.text), [
879 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
880 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
885 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
888 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
891 self.assertEqual(0, len(self.msg))
893 def test_notpresent(self):
894 self.assertRaises(KeyError, lambda: self.msg["foo"])
900 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
902 def test_add_text(self):
903 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
905 def test_elements_empty(self):
906 self.assertEqual([], self.msg.elements())
908 def test_elements(self):
909 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
911 self.assertEqual([el], self.msg.elements())
912 self.assertEqual([el.text], self.msg.text.elements())
914 def test_add_value(self):
915 self.assertEqual(0, len(self.msg))
916 self.msg["foo"] = [b"foo"]
917 self.assertEqual(1, len(self.msg))
919 def test_add_value_text(self):
920 self.assertEqual(0, len(self.msg))
921 self.msg["foo"] = ["foo"]
922 self.assertEqual(1, len(self.msg))
924 def test_add_value_multiple(self):
925 self.assertEqual(0, len(self.msg))
926 self.msg["foo"] = [b"foo", b"bla"]
927 self.assertEqual(1, len(self.msg))
928 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
930 def test_add_value_multiple_text(self):
931 self.assertEqual(0, len(self.msg))
932 self.msg["foo"] = ["foo", "bla"]
933 self.assertEqual(1, len(self.msg))
934 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
936 def test_set_value(self):
937 self.msg["foo"] = [b"fool"]
938 self.assertEqual([b"fool"], list(self.msg["foo"]))
939 self.msg["foo"] = [b"bar"]
940 self.assertEqual([b"bar"], list(self.msg["foo"]))
942 def test_set_value_text(self):
943 self.msg["foo"] = ["fool"]
944 self.assertEqual(["fool"], list(self.msg.text["foo"]))
945 self.msg["foo"] = ["bar"]
946 self.assertEqual(["bar"], list(self.msg.text["foo"]))
949 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
950 self.msg["foo"] = [b"bla"]
951 self.msg["bar"] = [b"bla"]
952 self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
954 def test_keys_text(self):
955 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
956 self.msg["foo"] = ["bla"]
957 self.msg["bar"] = ["bla"]
958 self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
961 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
962 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
964 def test_get_dn(self):
965 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
966 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
968 def test_dn_text(self):
969 self.msg.text.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
970 self.assertEqual("@BASEINFO", str(self.msg.dn))
971 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
973 def test_get_dn_text(self):
974 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
975 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
976 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
978 def test_get_invalid(self):
979 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
980 self.assertRaises(TypeError, self.msg.get, 42)
982 def test_get_other(self):
983 self.msg["foo"] = [b"bar"]
984 self.assertEqual(b"bar", self.msg.get("foo")[0])
985 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
986 self.assertEqual(None, self.msg.get("foo", idx=1))
987 self.assertEqual("", self.msg.get("foo", default='', idx=1))
989 def test_get_other_text(self):
990 self.msg["foo"] = ["bar"]
991 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
992 self.assertEqual("bar", self.msg.text.get("foo")[0])
993 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
994 self.assertEqual(None, self.msg.get("foo", idx=1))
995 self.assertEqual("", self.msg.get("foo", default='', idx=1))
997 def test_get_default(self):
998 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
999 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
1001 def test_get_default_text(self):
1002 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
1003 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
1005 def test_get_unknown(self):
1006 self.assertEqual(None, self.msg.get("lalalala"))
1008 def test_get_unknown_text(self):
1009 self.assertEqual(None, self.msg.text.get("lalalala"))
1011 def test_msg_diff(self):
1013 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
1014 msg1 = next(msgs)[1]
1015 msg2 = next(msgs)[1]
1016 msgdiff = l.msg_diff(msg1, msg2)
1017 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
1018 self.assertRaises(KeyError, lambda: msgdiff["foo"])
1019 self.assertEqual(1, len(msgdiff))
1021 def test_equal_empty(self):
1022 msg1 = ldb.Message()
1023 msg2 = ldb.Message()
1024 self.assertEqual(msg1, msg2)
1026 def test_equal_simplel(self):
1027 db = ldb.Ldb(self.filename)
1028 msg1 = ldb.Message()
1029 msg1.dn = ldb.Dn(db, "foo=bar")
1030 msg2 = ldb.Message()
1031 msg2.dn = ldb.Dn(db, "foo=bar")
1032 self.assertEqual(msg1, msg2)
1033 msg1['foo'] = b'bar'
1034 msg2['foo'] = b'bar'
1035 self.assertEqual(msg1, msg2)
1036 msg2['foo'] = b'blie'
1037 self.assertNotEqual(msg1, msg2)
1038 msg2['foo'] = b'blie'
1040 def test_from_dict(self):
1041 rec = {"dn": "dc=fromdict",
1042 "a1": [b"a1-val1", b"a1-val1"]}
1044 # check different types of input Flags
1045 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
1046 m = ldb.Message.from_dict(l, rec, flags)
1047 self.assertEqual(rec["a1"], list(m["a1"]))
1048 self.assertEqual(flags, m["a1"].flags())
1049 # check input params
1050 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
1051 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
1052 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
1053 # Message.from_dict expects dictionary with 'dn'
1054 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
1055 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
1057 def test_from_dict_text(self):
1058 rec = {"dn": "dc=fromdict",
1059 "a1": ["a1-val1", "a1-val1"]}
1061 # check different types of input Flags
1062 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
1063 m = ldb.Message.from_dict(l, rec, flags)
1064 self.assertEqual(rec["a1"], list(m.text["a1"]))
1065 self.assertEqual(flags, m.text["a1"].flags())
1066 # check input params
1067 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
1068 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
1069 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
1070 # Message.from_dict expects dictionary with 'dn'
1071 err_rec = {"a1": ["a1-val1", "a1-val1"]}
1072 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
1074 def test_copy_add_message_element(self):
1076 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
1077 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
1081 self.assertEqual(mto["1"], m["1"])
1082 self.assertEqual(mto["2"], m["2"])
1086 self.assertEqual(mto["1"], m["1"])
1087 self.assertEqual(mto["2"], m["2"])
1089 def test_copy_add_message_element_text(self):
1091 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
1092 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
1096 self.assertEqual(mto["1"], m.text["1"])
1097 self.assertEqual(mto["2"], m.text["2"])
1101 self.assertEqual(mto.text["1"], m.text["1"])
1102 self.assertEqual(mto.text["2"], m.text["2"])
1103 self.assertEqual(mto["1"], m["1"])
1104 self.assertEqual(mto["2"], m["2"])
1107 class MessageElementTests(TestCase):
1109 def test_cmp_element(self):
1110 x = ldb.MessageElement([b"foo"])
1111 y = ldb.MessageElement([b"foo"])
1112 z = ldb.MessageElement([b"bzr"])
1113 self.assertEqual(x, y)
1114 self.assertNotEqual(x, z)
1116 def test_cmp_element_text(self):
1117 x = ldb.MessageElement([b"foo"])
1118 y = ldb.MessageElement(["foo"])
1119 self.assertEqual(x, y)
1121 def test_create_iterable(self):
1122 x = ldb.MessageElement([b"foo"])
1123 self.assertEqual([b"foo"], list(x))
1124 self.assertEqual(["foo"], list(x.text))
1126 def test_repr(self):
1127 x = ldb.MessageElement([b"foo"])
1129 self.assertEqual("MessageElement([b'foo'])", repr(x))
1130 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
1132 self.assertEqual("MessageElement(['foo'])", repr(x))
1133 self.assertEqual("MessageElement(['foo']).text", repr(x.text))
1134 x = ldb.MessageElement([b"foo", b"bla"])
1135 self.assertEqual(2, len(x))
1137 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
1138 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
1140 self.assertEqual("MessageElement(['foo','bla'])", repr(x))
1141 self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
1143 def test_get_item(self):
1144 x = ldb.MessageElement([b"foo", b"bar"])
1145 self.assertEqual(b"foo", x[0])
1146 self.assertEqual(b"bar", x[1])
1147 self.assertEqual(b"bar", x[-1])
1148 self.assertRaises(IndexError, lambda: x[45])
1150 def test_get_item_text(self):
1151 x = ldb.MessageElement(["foo", "bar"])
1152 self.assertEqual("foo", x.text[0])
1153 self.assertEqual("bar", x.text[1])
1154 self.assertEqual("bar", x.text[-1])
1155 self.assertRaises(IndexError, lambda: x[45])
1158 x = ldb.MessageElement([b"foo", b"bar"])
1159 self.assertEqual(2, len(x))
1162 x = ldb.MessageElement([b"foo", b"bar"])
1163 y = ldb.MessageElement([b"foo", b"bar"])
1164 self.assertEqual(y, x)
1165 x = ldb.MessageElement([b"foo"])
1166 self.assertNotEqual(y, x)
1167 y = ldb.MessageElement([b"foo"])
1168 self.assertEqual(y, x)
1170 def test_extended(self):
1171 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
1173 self.assertEqual("MessageElement([b'456'])", repr(el))
1174 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
1176 self.assertEqual("MessageElement(['456'])", repr(el))
1177 self.assertEqual("MessageElement(['456']).text", repr(el.text))
1179 def test_bad_text(self):
1180 el = ldb.MessageElement(b'\xba\xdd')
1181 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
1184 class ModuleTests(TestCase):
1187 super(ModuleTests, self).setUp()
1188 self.testdir = tempdir()
1189 self.filename = os.path.join(self.testdir, "test.ldb")
1190 self.ldb = ldb.Ldb(self.filename)
1193 shutil.rmtree(self.testdir)
1194 super(ModuleTests, self).setUp()
1196 def test_register_module(self):
1197 class ExampleModule:
1199 ldb.register_module(ExampleModule)
1201 def test_use_module(self):
1203 class ExampleModule:
1206 def __init__(self, ldb, next):
1210 def search(self, *args, **kwargs):
1211 return self.next.search(*args, **kwargs)
1213 def request(self, *args, **kwargs):
1216 ldb.register_module(ExampleModule)
1217 l = ldb.Ldb(self.filename)
1218 l.add({"dn": "@MODULES", "@LIST": "bla"})
1219 self.assertEqual([], ops)
1220 l = ldb.Ldb(self.filename)
1221 self.assertEqual(["init"], ops)
1223 class LdbResultTests(TestCase):
1226 super(LdbResultTests, self).setUp()
1227 self.testdir = tempdir()
1228 self.filename = os.path.join(self.testdir, "test.ldb")
1229 self.l = ldb.Ldb(self.filename)
1230 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
1231 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
1232 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
1233 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1"})
1234 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2"})
1235 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3"})
1236 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4"})
1237 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5"})
1238 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6"})
1239 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7"})
1240 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8"})
1241 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9"})
1242 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10"})
1245 shutil.rmtree(self.testdir)
1246 super(LdbResultTests, self).tearDown()
1248 def test_return_type(self):
1249 res = self.l.search()
1250 self.assertEqual(str(res), "<ldb result>")
1252 def test_get_msgs(self):
1253 res = self.l.search()
1256 def test_get_controls(self):
1257 res = self.l.search()
1260 def test_get_referals(self):
1261 res = self.l.search()
1264 def test_iter_msgs(self):
1266 for l in self.l.search().msgs:
1267 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1269 self.assertTrue(found)
1271 def test_iter_msgs_count(self):
1272 self.assertTrue(self.l.search().count > 0)
1273 # 13 objects has been added to the DC=SAMBA, DC=ORG
1274 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
1276 def test_iter_controls(self):
1277 res = self.l.search().controls
1280 def test_create_control(self):
1281 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
1282 c = ldb.Control(self.l, "relax:1")
1283 self.assertEqual(c.critical, True)
1284 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
1286 def test_iter_refs(self):
1287 res = self.l.search().referals
1290 def test_search_sequence_msgs(self):
1292 res = self.l.search().msgs
1294 for i in range(0, len(res)):
1296 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1298 self.assertTrue(found)
1300 def test_search_as_iter(self):
1302 res = self.l.search()
1305 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1307 self.assertTrue(found)
1309 def test_search_iter(self):
1311 res = self.l.search_iterator()
1314 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1316 self.assertTrue(found)
1319 # Show that search results can't see into a transaction
1320 def test_search_against_trans(self):
1323 (r1, w1) = os.pipe()
1325 (r2, w2) = os.pipe()
1327 # For the first element, fork a child that will
1331 # In the child, re-open
1335 child_ldb = ldb.Ldb(self.filename)
1336 # start a transaction
1337 child_ldb.transaction_start()
1340 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
1341 "name": b"samba.org"})
1343 os.write(w1, b"added")
1345 # Now wait for the search to be done
1350 child_ldb.transaction_commit()
1351 except LdbError as err:
1352 # We print this here to see what went wrong in the child
1356 os.write(w1, b"transaction")
1359 self.assertEqual(os.read(r1, 5), b"added")
1361 # This should not turn up until the transaction is concluded
1362 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1363 scope=ldb.SCOPE_BASE)
1364 self.assertEqual(len(res11), 0)
1366 os.write(w2, b"search")
1368 # Now wait for the transaction to be done. This should
1369 # deadlock, but the search doesn't hold a read lock for the
1370 # iterator lifetime currently.
1371 self.assertEqual(os.read(r1, 11), b"transaction")
1373 # This should now turn up, as the transaction is over
1374 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1375 scope=ldb.SCOPE_BASE)
1376 self.assertEqual(len(res11), 1)
1378 self.assertFalse(found11)
1380 (got_pid, status) = os.waitpid(pid, 0)
1381 self.assertEqual(got_pid, pid)
1384 def test_search_iter_against_trans(self):
1388 # We need to hold this iterator open to hold the all-record
1390 res = self.l.search_iterator()
1392 (r1, w1) = os.pipe()
1394 (r2, w2) = os.pipe()
1396 # For the first element, with the sequence open (which
1397 # means with ldb locks held), fork a child that will
1401 # In the child, re-open
1406 child_ldb = ldb.Ldb(self.filename)
1407 # start a transaction
1408 child_ldb.transaction_start()
1411 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
1412 "name": b"samba.org"})
1414 os.write(w1, b"added")
1416 # Now wait for the search to be done
1421 child_ldb.transaction_commit()
1422 except LdbError as err:
1423 # We print this here to see what went wrong in the child
1427 os.write(w1, b"transaction")
1430 self.assertEqual(os.read(r1, 5), b"added")
1432 # This should not turn up until the transaction is concluded
1433 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1434 scope=ldb.SCOPE_BASE)
1435 self.assertEqual(len(res11), 0)
1437 os.write(w2, b"search")
1439 # allow the transaction to start
1442 # This should not turn up until the search finishes and
1443 # removed the read lock, but for ldb_tdb that happened as soon
1444 # as we called the first res.next()
1445 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1446 scope=ldb.SCOPE_BASE)
1447 self.assertEqual(len(res11), 0)
1449 # These results are all collected at the first next(res) call
1451 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1453 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
1456 # Now wait for the transaction to be done.
1457 self.assertEqual(os.read(r1, 11), b"transaction")
1459 # This should now turn up, as the transaction is over and all
1460 # read locks are gone
1461 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1462 scope=ldb.SCOPE_BASE)
1463 self.assertEqual(len(res11), 1)
1465 self.assertTrue(found)
1466 self.assertFalse(found11)
1468 (got_pid, status) = os.waitpid(pid, 0)
1469 self.assertEqual(got_pid, pid)
1472 class BadTypeTests(TestCase):
1473 def test_control(self):
1475 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
1476 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
1478 def test_modify(self):
1480 dn = ldb.Dn(l, 'a=b')
1482 self.assertRaises(TypeError, l.modify, '<bad type>')
1483 self.assertRaises(TypeError, l.modify, m, '<bad type>')
1487 dn = ldb.Dn(l, 'a=b')
1489 self.assertRaises(TypeError, l.add, '<bad type>')
1490 self.assertRaises(TypeError, l.add, m, '<bad type>')
1492 def test_delete(self):
1494 dn = ldb.Dn(l, 'a=b')
1495 self.assertRaises(TypeError, l.add, '<bad type>')
1496 self.assertRaises(TypeError, l.add, dn, '<bad type>')
1498 def test_rename(self):
1500 dn = ldb.Dn(l, 'a=b')
1501 self.assertRaises(TypeError, l.add, '<bad type>', dn)
1502 self.assertRaises(TypeError, l.add, dn, '<bad type>')
1503 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
1505 def test_search(self):
1507 self.assertRaises(TypeError, l.search, base=1234)
1508 self.assertRaises(TypeError, l.search, scope='<bad type>')
1509 self.assertRaises(TypeError, l.search, expression=1234)
1510 self.assertRaises(TypeError, l.search, attrs='<bad type>')
1511 self.assertRaises(TypeError, l.search, controls='<bad type>')
1514 class VersionTests(TestCase):
1516 def test_version(self):
1517 self.assertTrue(isinstance(ldb.__version__, str))
1520 if __name__ == '__main__':
1522 unittest.TestProgram()