ldb: Add tests for when we should expect a full scan
[vlendec/samba-autobuild/.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18 MDB_INDEX_OBJ = {
19     "dn": "@INDEXLIST",
20     "@IDXONE": [b"1"],
21     "@IDXGUID": [b"objectUUID"],
22     "@IDX_DN_GUID": [b"GUID"]
23 }
24
25 def tempdir():
26     import tempfile
27     try:
28         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
29     except KeyError:
30         dir_prefix = None
31     return tempfile.mkdtemp(dir=dir_prefix)
32
33
34 class NoContextTests(TestCase):
35
36     def test_valid_attr_name(self):
37         self.assertTrue(ldb.valid_attr_name("foo"))
38         self.assertFalse(ldb.valid_attr_name("24foo"))
39
40     def test_timestring(self):
41         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
42         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
43
44     def test_string_to_time(self):
45         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
46         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
47
48     def test_binary_encode(self):
49         encoded = ldb.binary_encode(b'test\\x')
50         decoded = ldb.binary_decode(encoded)
51         self.assertEqual(decoded, b'test\\x')
52
53         encoded2 = ldb.binary_encode('test\\x')
54         self.assertEqual(encoded2, encoded)
55
56
57 class LdbBaseTest(TestCase):
58     def setUp(self):
59         super(LdbBaseTest, self).setUp()
60         try:
61             if self.prefix is None:
62                 self.prefix = TDB_PREFIX
63         except AttributeError:
64             self.prefix = TDB_PREFIX
65
66     def tearDown(self):
67         super(LdbBaseTest, self).tearDown()
68
69     def url(self):
70         return self.prefix + self.filename
71
72     def flags(self):
73         if self.prefix == MDB_PREFIX:
74             return ldb.FLG_NOSYNC
75         else:
76             return 0
77
78
79 class SimpleLdb(LdbBaseTest):
80
81     def setUp(self):
82         super(SimpleLdb, self).setUp()
83         self.testdir = tempdir()
84         self.filename = os.path.join(self.testdir, "test.ldb")
85         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
86         try:
87             self.ldb.add(self.index)
88         except AttributeError:
89             pass
90
91     def tearDown(self):
92         shutil.rmtree(self.testdir)
93         super(SimpleLdb, self).tearDown()
94         # Ensure the LDB is closed now, so we close the FD
95         del(self.ldb)
96
97     def test_connect(self):
98         ldb.Ldb(self.url(), flags=self.flags())
99
100     def test_connect_none(self):
101         ldb.Ldb()
102
103     def test_connect_later(self):
104         x = ldb.Ldb()
105         x.connect(self.url(), flags=self.flags())
106
107     def test_repr(self):
108         x = ldb.Ldb()
109         self.assertTrue(repr(x).startswith("<ldb connection"))
110
111     def test_set_create_perms(self):
112         x = ldb.Ldb()
113         x.set_create_perms(0o600)
114
115     def test_modules_none(self):
116         x = ldb.Ldb()
117         self.assertEqual([], x.modules())
118
119     def test_modules_tdb(self):
120         x = ldb.Ldb(self.url(), flags=self.flags())
121         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
122
123     def test_firstmodule_none(self):
124         x = ldb.Ldb()
125         self.assertEqual(x.firstmodule, None)
126
127     def test_firstmodule_tdb(self):
128         x = ldb.Ldb(self.url(), flags=self.flags())
129         mod = x.firstmodule
130         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
131
132     def test_search(self):
133         l = ldb.Ldb(self.url(), flags=self.flags())
134         self.assertEqual(len(l.search()), 0)
135
136     def test_search_controls(self):
137         l = ldb.Ldb(self.url(), flags=self.flags())
138         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
139
140     def test_search_attrs(self):
141         l = ldb.Ldb(self.url(), flags=self.flags())
142         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
143
144     def test_search_string_dn(self):
145         l = ldb.Ldb(self.url(), flags=self.flags())
146         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
147
148     def test_search_attr_string(self):
149         l = ldb.Ldb(self.url(), flags=self.flags())
150         self.assertRaises(TypeError, l.search, attrs="dc")
151         self.assertRaises(TypeError, l.search, attrs=b"dc")
152
153     def test_opaque(self):
154         l = ldb.Ldb(self.url(), flags=self.flags())
155         l.set_opaque("my_opaque", l)
156         self.assertTrue(l.get_opaque("my_opaque") is not None)
157         self.assertEqual(None, l.get_opaque("unknown"))
158
159     def test_search_scope_base_empty_db(self):
160         l = ldb.Ldb(self.url(), flags=self.flags())
161         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
162                           ldb.SCOPE_BASE)), 0)
163
164     def test_search_scope_onelevel_empty_db(self):
165         l = ldb.Ldb(self.url(), flags=self.flags())
166         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
167                           ldb.SCOPE_ONELEVEL)), 0)
168
169     def test_delete(self):
170         l = ldb.Ldb(self.url(), flags=self.flags())
171         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
172
173     def test_delete_w_unhandled_ctrl(self):
174         l = ldb.Ldb(self.url(), flags=self.flags())
175         m = ldb.Message()
176         m.dn = ldb.Dn(l, "dc=foo1")
177         m["b"] = [b"a"]
178         m["objectUUID"] = b"0123456789abcdef"
179         l.add(m)
180         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
181         l.delete(m.dn)
182
183     def test_contains(self):
184         name = self.url()
185         l = ldb.Ldb(name, flags=self.flags())
186         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
187         l = ldb.Ldb(name, flags=self.flags())
188         m = ldb.Message()
189         m.dn = ldb.Dn(l, "dc=foo3")
190         m["b"] = ["a"]
191         m["objectUUID"] = b"0123456789abcdef"
192         l.add(m)
193         try:
194             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
195             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
196         finally:
197             l.delete(m.dn)
198
199     def test_get_config_basedn(self):
200         l = ldb.Ldb(self.url(), flags=self.flags())
201         self.assertEqual(None, l.get_config_basedn())
202
203     def test_get_root_basedn(self):
204         l = ldb.Ldb(self.url(), flags=self.flags())
205         self.assertEqual(None, l.get_root_basedn())
206
207     def test_get_schema_basedn(self):
208         l = ldb.Ldb(self.url(), flags=self.flags())
209         self.assertEqual(None, l.get_schema_basedn())
210
211     def test_get_default_basedn(self):
212         l = ldb.Ldb(self.url(), flags=self.flags())
213         self.assertEqual(None, l.get_default_basedn())
214
215     def test_add(self):
216         l = ldb.Ldb(self.url(), flags=self.flags())
217         m = ldb.Message()
218         m.dn = ldb.Dn(l, "dc=foo4")
219         m["bla"] = b"bla"
220         m["objectUUID"] = b"0123456789abcdef"
221         self.assertEqual(len(l.search()), 0)
222         l.add(m)
223         try:
224             self.assertEqual(len(l.search()), 1)
225         finally:
226             l.delete(ldb.Dn(l, "dc=foo4"))
227
228     def test_search_iterator(self):
229         l = ldb.Ldb(self.url(), flags=self.flags())
230         s = l.search_iterator()
231         s.abandon()
232         try:
233             for me in s:
234                 self.fail()
235             self.fail()
236         except RuntimeError as re:
237             pass
238         try:
239             s.abandon()
240             self.fail()
241         except RuntimeError as re:
242             pass
243         try:
244             s.result()
245             self.fail()
246         except RuntimeError as re:
247             pass
248
249         s = l.search_iterator()
250         count = 0
251         for me in s:
252             self.assertTrue(isinstance(me, ldb.Message))
253             count += 1
254         r = s.result()
255         self.assertEqual(len(r), 0)
256         self.assertEqual(count, 0)
257
258         m1 = ldb.Message()
259         m1.dn = ldb.Dn(l, "dc=foo4")
260         m1["bla"] = b"bla"
261         m1["objectUUID"] = b"0123456789abcdef"
262         l.add(m1)
263         try:
264             s = l.search_iterator()
265             msgs = []
266             for me in s:
267                 self.assertTrue(isinstance(me, ldb.Message))
268                 count += 1
269                 msgs.append(me)
270             r = s.result()
271             self.assertEqual(len(r), 0)
272             self.assertEqual(len(msgs), 1)
273             self.assertEqual(msgs[0].dn, m1.dn)
274
275             m2 = ldb.Message()
276             m2.dn = ldb.Dn(l, "dc=foo5")
277             m2["bla"] = b"bla"
278             m2["objectUUID"] = b"0123456789abcdee"
279             l.add(m2)
280
281             s = l.search_iterator()
282             msgs = []
283             for me in s:
284                 self.assertTrue(isinstance(me, ldb.Message))
285                 count += 1
286                 msgs.append(me)
287             r = s.result()
288             self.assertEqual(len(r), 0)
289             self.assertEqual(len(msgs), 2)
290             if msgs[0].dn == m1.dn:
291                 self.assertEqual(msgs[0].dn, m1.dn)
292                 self.assertEqual(msgs[1].dn, m2.dn)
293             else:
294                 self.assertEqual(msgs[0].dn, m2.dn)
295                 self.assertEqual(msgs[1].dn, m1.dn)
296
297             s = l.search_iterator()
298             msgs = []
299             for me in s:
300                 self.assertTrue(isinstance(me, ldb.Message))
301                 count += 1
302                 msgs.append(me)
303                 break
304             try:
305                 s.result()
306                 self.fail()
307             except RuntimeError as re:
308                 pass
309             for me in s:
310                 self.assertTrue(isinstance(me, ldb.Message))
311                 count += 1
312                 msgs.append(me)
313                 break
314             for me in s:
315                 self.fail()
316
317             r = s.result()
318             self.assertEqual(len(r), 0)
319             self.assertEqual(len(msgs), 2)
320             if msgs[0].dn == m1.dn:
321                 self.assertEqual(msgs[0].dn, m1.dn)
322                 self.assertEqual(msgs[1].dn, m2.dn)
323             else:
324                 self.assertEqual(msgs[0].dn, m2.dn)
325                 self.assertEqual(msgs[1].dn, m1.dn)
326         finally:
327             l.delete(ldb.Dn(l, "dc=foo4"))
328             l.delete(ldb.Dn(l, "dc=foo5"))
329
330     def test_add_text(self):
331         l = ldb.Ldb(self.url(), flags=self.flags())
332         m = ldb.Message()
333         m.dn = ldb.Dn(l, "dc=foo4")
334         m["bla"] = "bla"
335         m["objectUUID"] = b"0123456789abcdef"
336         self.assertEqual(len(l.search()), 0)
337         l.add(m)
338         try:
339             self.assertEqual(len(l.search()), 1)
340         finally:
341             l.delete(ldb.Dn(l, "dc=foo4"))
342
343     def test_add_w_unhandled_ctrl(self):
344         l = ldb.Ldb(self.url(), flags=self.flags())
345         m = ldb.Message()
346         m.dn = ldb.Dn(l, "dc=foo4")
347         m["bla"] = b"bla"
348         self.assertEqual(len(l.search()), 0)
349         self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
350
351     def test_add_dict(self):
352         l = ldb.Ldb(self.url(), flags=self.flags())
353         m = {"dn": ldb.Dn(l, "dc=foo5"),
354              "bla": b"bla",
355              "objectUUID": b"0123456789abcdef"}
356         self.assertEqual(len(l.search()), 0)
357         l.add(m)
358         try:
359             self.assertEqual(len(l.search()), 1)
360         finally:
361             l.delete(ldb.Dn(l, "dc=foo5"))
362
363     def test_add_dict_text(self):
364         l = ldb.Ldb(self.url(), flags=self.flags())
365         m = {"dn": ldb.Dn(l, "dc=foo5"),
366              "bla": "bla",
367              "objectUUID": b"0123456789abcdef"}
368         self.assertEqual(len(l.search()), 0)
369         l.add(m)
370         try:
371             self.assertEqual(len(l.search()), 1)
372         finally:
373             l.delete(ldb.Dn(l, "dc=foo5"))
374
375     def test_add_dict_string_dn(self):
376         l = ldb.Ldb(self.url(), flags=self.flags())
377         m = {"dn": "dc=foo6", "bla": b"bla",
378              "objectUUID": b"0123456789abcdef"}
379         self.assertEqual(len(l.search()), 0)
380         l.add(m)
381         try:
382             self.assertEqual(len(l.search()), 1)
383         finally:
384             l.delete(ldb.Dn(l, "dc=foo6"))
385
386     def test_add_dict_bytes_dn(self):
387         l = ldb.Ldb(self.url(), flags=self.flags())
388         m = {"dn": b"dc=foo6", "bla": b"bla",
389               "objectUUID": b"0123456789abcdef"}
390         self.assertEqual(len(l.search()), 0)
391         l.add(m)
392         try:
393             self.assertEqual(len(l.search()), 1)
394         finally:
395             l.delete(ldb.Dn(l, "dc=foo6"))
396
397     def test_rename(self):
398         l = ldb.Ldb(self.url(), flags=self.flags())
399         m = ldb.Message()
400         m.dn = ldb.Dn(l, "dc=foo7")
401         m["bla"] = b"bla"
402         m["objectUUID"] = b"0123456789abcdef"
403         self.assertEqual(len(l.search()), 0)
404         l.add(m)
405         try:
406             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
407             self.assertEqual(len(l.search()), 1)
408         finally:
409             l.delete(ldb.Dn(l, "dc=bar"))
410
411     def test_rename_string_dns(self):
412         l = ldb.Ldb(self.url(), flags=self.flags())
413         m = ldb.Message()
414         m.dn = ldb.Dn(l, "dc=foo8")
415         m["bla"] = b"bla"
416         m["objectUUID"] = b"0123456789abcdef"
417         self.assertEqual(len(l.search()), 0)
418         l.add(m)
419         self.assertEqual(len(l.search()), 1)
420         try:
421             l.rename("dc=foo8", "dc=bar")
422             self.assertEqual(len(l.search()), 1)
423         finally:
424             l.delete(ldb.Dn(l, "dc=bar"))
425
426     def test_empty_dn(self):
427         l = ldb.Ldb(self.url(), flags=self.flags())
428         self.assertEqual(0, len(l.search()))
429         m = ldb.Message()
430         m.dn = ldb.Dn(l, "dc=empty")
431         m["objectUUID"] = b"0123456789abcdef"
432         l.add(m)
433         rm = l.search()
434         self.assertEqual(1, len(rm))
435         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
436                          set(rm[0].keys()))
437
438         rm = l.search(m.dn)
439         self.assertEqual(1, len(rm))
440         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
441                          set(rm[0].keys()))
442         rm = l.search(m.dn, attrs=["blah"])
443         self.assertEqual(1, len(rm))
444         self.assertEqual(0, len(rm[0]))
445
446     def test_modify_delete(self):
447         l = ldb.Ldb(self.url(), flags=self.flags())
448         m = ldb.Message()
449         m.dn = ldb.Dn(l, "dc=modifydelete")
450         m["bla"] = [b"1234"]
451         m["objectUUID"] = b"0123456789abcdef"
452         l.add(m)
453         rm = l.search(m.dn)[0]
454         self.assertEqual([b"1234"], list(rm["bla"]))
455         try:
456             m = ldb.Message()
457             m.dn = ldb.Dn(l, "dc=modifydelete")
458             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
459             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
460             l.modify(m)
461             rm = l.search(m.dn)
462             self.assertEqual(1, len(rm))
463             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
464                              set(rm[0].keys()))
465             rm = l.search(m.dn, attrs=["bla"])
466             self.assertEqual(1, len(rm))
467             self.assertEqual(0, len(rm[0]))
468         finally:
469             l.delete(ldb.Dn(l, "dc=modifydelete"))
470
471     def test_modify_delete_text(self):
472         l = ldb.Ldb(self.url(), flags=self.flags())
473         m = ldb.Message()
474         m.dn = ldb.Dn(l, "dc=modifydelete")
475         m.text["bla"] = ["1234"]
476         m["objectUUID"] = b"0123456789abcdef"
477         l.add(m)
478         rm = l.search(m.dn)[0]
479         self.assertEqual(["1234"], list(rm.text["bla"]))
480         try:
481             m = ldb.Message()
482             m.dn = ldb.Dn(l, "dc=modifydelete")
483             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
484             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
485             l.modify(m)
486             rm = l.search(m.dn)
487             self.assertEqual(1, len(rm))
488             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
489                              set(rm[0].keys()))
490             rm = l.search(m.dn, attrs=["bla"])
491             self.assertEqual(1, len(rm))
492             self.assertEqual(0, len(rm[0]))
493         finally:
494             l.delete(ldb.Dn(l, "dc=modifydelete"))
495
496     def test_modify_add(self):
497         l = ldb.Ldb(self.url(), flags=self.flags())
498         m = ldb.Message()
499         m.dn = ldb.Dn(l, "dc=add")
500         m["bla"] = [b"1234"]
501         m["objectUUID"] = b"0123456789abcdef"
502         l.add(m)
503         try:
504             m = ldb.Message()
505             m.dn = ldb.Dn(l, "dc=add")
506             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
507             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
508             l.modify(m)
509             rm = l.search(m.dn)[0]
510             self.assertEqual(3, len(rm))
511             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
512         finally:
513             l.delete(ldb.Dn(l, "dc=add"))
514
515     def test_modify_add_text(self):
516         l = ldb.Ldb(self.url(), flags=self.flags())
517         m = ldb.Message()
518         m.dn = ldb.Dn(l, "dc=add")
519         m.text["bla"] = ["1234"]
520         m["objectUUID"] = b"0123456789abcdef"
521         l.add(m)
522         try:
523             m = ldb.Message()
524             m.dn = ldb.Dn(l, "dc=add")
525             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
526             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
527             l.modify(m)
528             rm = l.search(m.dn)[0]
529             self.assertEqual(3, len(rm))
530             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
531         finally:
532             l.delete(ldb.Dn(l, "dc=add"))
533
534     def test_modify_replace(self):
535         l = ldb.Ldb(self.url(), flags=self.flags())
536         m = ldb.Message()
537         m.dn = ldb.Dn(l, "dc=modify2")
538         m["bla"] = [b"1234", b"456"]
539         m["objectUUID"] = b"0123456789abcdef"
540         l.add(m)
541         try:
542             m = ldb.Message()
543             m.dn = ldb.Dn(l, "dc=modify2")
544             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
545             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
546             l.modify(m)
547             rm = l.search(m.dn)[0]
548             self.assertEqual(3, len(rm))
549             self.assertEqual([b"789"], list(rm["bla"]))
550             rm = l.search(m.dn, attrs=["bla"])[0]
551             self.assertEqual(1, len(rm))
552         finally:
553             l.delete(ldb.Dn(l, "dc=modify2"))
554
555     def test_modify_replace_text(self):
556         l = ldb.Ldb(self.url(), flags=self.flags())
557         m = ldb.Message()
558         m.dn = ldb.Dn(l, "dc=modify2")
559         m.text["bla"] = ["1234", "456"]
560         m["objectUUID"] = b"0123456789abcdef"
561         l.add(m)
562         try:
563             m = ldb.Message()
564             m.dn = ldb.Dn(l, "dc=modify2")
565             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
566             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
567             l.modify(m)
568             rm = l.search(m.dn)[0]
569             self.assertEqual(3, len(rm))
570             self.assertEqual(["789"], list(rm.text["bla"]))
571             rm = l.search(m.dn, attrs=["bla"])[0]
572             self.assertEqual(1, len(rm))
573         finally:
574             l.delete(ldb.Dn(l, "dc=modify2"))
575
576     def test_modify_flags_change(self):
577         l = ldb.Ldb(self.url(), flags=self.flags())
578         m = ldb.Message()
579         m.dn = ldb.Dn(l, "dc=add")
580         m["bla"] = [b"1234"]
581         m["objectUUID"] = b"0123456789abcdef"
582         l.add(m)
583         try:
584             m = ldb.Message()
585             m.dn = ldb.Dn(l, "dc=add")
586             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
587             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
588             l.modify(m)
589             rm = l.search(m.dn)[0]
590             self.assertEqual(3, len(rm))
591             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
592
593             # Now create another modify, but switch the flags before we do it
594             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
595             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
596             l.modify(m)
597             rm = l.search(m.dn, attrs=["bla"])[0]
598             self.assertEqual(1, len(rm))
599             self.assertEqual([b"1234"], list(rm["bla"]))
600         finally:
601             l.delete(ldb.Dn(l, "dc=add"))
602
603     def test_modify_flags_change_text(self):
604         l = ldb.Ldb(self.url(), flags=self.flags())
605         m = ldb.Message()
606         m.dn = ldb.Dn(l, "dc=add")
607         m.text["bla"] = ["1234"]
608         m["objectUUID"] = b"0123456789abcdef"
609         l.add(m)
610         try:
611             m = ldb.Message()
612             m.dn = ldb.Dn(l, "dc=add")
613             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
614             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
615             l.modify(m)
616             rm = l.search(m.dn)[0]
617             self.assertEqual(3, len(rm))
618             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
619
620             # Now create another modify, but switch the flags before we do it
621             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
622             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
623             l.modify(m)
624             rm = l.search(m.dn, attrs=["bla"])[0]
625             self.assertEqual(1, len(rm))
626             self.assertEqual(["1234"], list(rm.text["bla"]))
627         finally:
628             l.delete(ldb.Dn(l, "dc=add"))
629
630     def test_transaction_commit(self):
631         l = ldb.Ldb(self.url(), flags=self.flags())
632         l.transaction_start()
633         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
634         m["foo"] = [b"bar"]
635         m["objectUUID"] = b"0123456789abcdef"
636         l.add(m)
637         l.transaction_commit()
638         l.delete(m.dn)
639
640     def test_transaction_cancel(self):
641         l = ldb.Ldb(self.url(), flags=self.flags())
642         l.transaction_start()
643         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
644         m["foo"] = [b"bar"]
645         m["objectUUID"] = b"0123456789abcdee"
646         l.add(m)
647         l.transaction_cancel()
648         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
649
650     def test_set_debug(self):
651         def my_report_fn(level, text):
652             pass
653         l = ldb.Ldb(self.url(), flags=self.flags())
654         l.set_debug(my_report_fn)
655
656     def test_zero_byte_string(self):
657         """Testing we do not get trapped in the \0 byte in a property string."""
658         l = ldb.Ldb(self.url(), flags=self.flags())
659         l.add({
660             "dn" : b"dc=somedn",
661             "objectclass" : b"user",
662             "cN" : b"LDAPtestUSER",
663             "givenname" : b"ldap",
664             "displayname" : b"foo\0bar",
665             "objectUUID" : b"0123456789abcdef"
666         })
667         res = l.search(expression="(dn=dc=somedn)")
668         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
669
670     def test_no_crash_broken_expr(self):
671         l = ldb.Ldb(self.url(), flags=self.flags())
672         self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
673
674 # Run the SimpleLdb tests against an lmdb backend
675 class SimpleLdbLmdb(SimpleLdb):
676
677     def setUp(self):
678         self.prefix = MDB_PREFIX
679         self.index = MDB_INDEX_OBJ
680         super(SimpleLdbLmdb, self).setUp()
681
682     def tearDown(self):
683         super(SimpleLdbLmdb, self).tearDown()
684
685 class SearchTests(LdbBaseTest):
686     def tearDown(self):
687         shutil.rmtree(self.testdir)
688         super(SearchTests, self).tearDown()
689
690         # Ensure the LDB is closed now, so we close the FD
691         del(self.l)
692
693
694     def setUp(self):
695         super(SearchTests, self).setUp()
696         self.testdir = tempdir()
697         self.filename = os.path.join(self.testdir, "search_test.ldb")
698         options = ["modules:rdn_name"]
699         if hasattr(self, 'IDXCHECK'):
700             options.append("disable_full_db_scan_for_self_test:1")
701         self.l = ldb.Ldb(self.url(),
702                          flags=self.flags(),
703                          options=options)
704         try:
705             self.l.add(self.index)
706         except AttributeError:
707             pass
708
709         self.l.add({"dn": "@ATTRIBUTES",
710                     "DC": "CASE_INSENSITIVE"})
711
712         # Note that we can't use the name objectGUID here, as we
713         # want to stay clear of the objectGUID handler in LDB and
714         # instead use just the 16 bytes raw, which we just keep
715         # to printable chars here for ease of handling.
716
717         self.l.add({"dn": "DC=SAMBA,DC=ORG",
718                     "name": b"samba.org",
719                     "objectUUID": b"0123456789abcdef"})
720         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
721                     "name": b"Admins",
722                     "x": "z", "y": "a",
723                     "objectUUID": b"0123456789abcde1"})
724         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
725                     "name": b"Users",
726                     "x": "z", "y": "a",
727                     "objectUUID": b"0123456789abcde2"})
728         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
729                     "name": b"OU #1",
730                     "x": "y", "y": "a",
731                     "objectUUID": b"0123456789abcde3"})
732         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
733                     "name": b"OU #2",
734                     "x": "y", "y": "a",
735                     "objectUUID": b"0123456789abcde4"})
736         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
737                     "name": b"OU #3",
738                     "x": "y", "y": "a",
739                     "objectUUID": b"0123456789abcde5"})
740         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
741                     "name": b"OU #4",
742                     "x": "y", "y": "a",
743                     "objectUUID": b"0123456789abcde6"})
744         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
745                     "name": b"OU #5",
746                     "x": "y", "y": "a",
747                     "objectUUID": b"0123456789abcde7"})
748         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
749                     "name": b"OU #6",
750                     "x": "y", "y": "a",
751                     "objectUUID": b"0123456789abcde8"})
752         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
753                     "name": b"OU #7",
754                     "x": "y", "y": "a",
755                     "objectUUID": b"0123456789abcde9"})
756         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
757                     "name": b"OU #8",
758                     "x": "y", "y": "a",
759                     "objectUUID": b"0123456789abcde0"})
760         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
761                     "name": b"OU #9",
762                     "x": "y", "y": "a",
763                     "objectUUID": b"0123456789abcdea"})
764         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
765                     "name": b"OU #10",
766                     "x": "y", "y": "a",
767                     "objectUUID": b"0123456789abcdeb"})
768         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
769                     "name": b"OU #10",
770                     "x": "y", "y": "a",
771                     "objectUUID": b"0123456789abcdec"})
772         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
773                     "name": b"OU #10",
774                     "x": "y", "y": "b",
775                     "objectUUID": b"0123456789abcded"})
776         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
777                     "name": b"OU #10",
778                     "x": "x", "y": "b",
779                     "objectUUID": b"0123456789abcdee"})
780         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
781                     "name": b"OU #10",
782                     "x": "x", "y": "b",
783                     "objectUUID": b"0123456789abcd01"})
784         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
785                     "name": b"OU #10",
786                     "x": "x", "y": "b",
787                     "objectUUID": b"0123456789abcd02"})
788         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
789                     "name": b"OU #10",
790                     "x": "x", "y": "b",
791                     "objectUUID": b"0123456789abcd03"})
792         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
793                     "name": b"OU #10",
794                     "x": "x", "y": "b",
795                     "objectUUID": b"0123456789abcd04"})
796         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
797                     "name": b"OU #10",
798                     "x": "x", "y": "b",
799                     "objectUUID": b"0123456789abcd05"})
800         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
801                     "name": b"OU #10",
802                     "x": "x", "y": "b",
803                     "objectUUID": b"0123456789abcd06"})
804         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
805                     "name": b"OU #10",
806                     "x": "x", "y": "b",
807                     "objectUUID": b"0123456789abcd07"})
808         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
809                     "name": b"OU #10",
810                     "x": "x", "y": "c",
811                     "objectUUID": b"0123456789abcd08"})
812         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
813                     "name": b"OU #10",
814                     "x": "x", "y": "c",
815                     "objectUUID": b"0123456789abcd09"})
816
817     def test_base(self):
818         """Testing a search"""
819
820         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
821                               scope=ldb.SCOPE_BASE)
822         self.assertEqual(len(res11), 1)
823
824     def test_base_lower(self):
825         """Testing a search"""
826
827         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
828                               scope=ldb.SCOPE_BASE)
829         self.assertEqual(len(res11), 1)
830
831     def test_base_or(self):
832         """Testing a search"""
833
834         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
835                               scope=ldb.SCOPE_BASE,
836                               expression="(|(ou=ou11)(ou=ou12))")
837         self.assertEqual(len(res11), 1)
838
839     def test_base_or2(self):
840         """Testing a search"""
841
842         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
843                               scope=ldb.SCOPE_BASE,
844                               expression="(|(x=y)(y=b))")
845         self.assertEqual(len(res11), 1)
846
847     def test_base_and(self):
848         """Testing a search"""
849
850         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
851                               scope=ldb.SCOPE_BASE,
852                               expression="(&(ou=ou11)(ou=ou12))")
853         self.assertEqual(len(res11), 0)
854
855     def test_base_and2(self):
856         """Testing a search"""
857
858         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
859                               scope=ldb.SCOPE_BASE,
860                               expression="(&(x=y)(y=a))")
861         self.assertEqual(len(res11), 1)
862
863     def test_base_false(self):
864         """Testing a search"""
865
866         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
867                               scope=ldb.SCOPE_BASE,
868                               expression="(|(ou=ou13)(ou=ou12))")
869         self.assertEqual(len(res11), 0)
870
871     def test_check_base_false(self):
872         """Testing a search"""
873         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
874                               scope=ldb.SCOPE_BASE,
875                               expression="(|(ou=ou13)(ou=ou12))")
876         self.assertEqual(len(res11), 0)
877
878     def test_check_base_error(self):
879         """Testing a search"""
880         checkbaseonsearch = {"dn": "@OPTIONS",
881                              "checkBaseOnSearch": b"TRUE"}
882         try:
883             self.l.add(checkbaseonsearch)
884         except ldb.LdbError as err:
885             enum = err.args[0]
886             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
887             m = ldb.Message.from_dict(self.l,
888                                       checkbaseonsearch)
889             self.l.modify(m)
890
891         try:
892             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
893                                   scope=ldb.SCOPE_BASE,
894                                   expression="(|(ou=ou13)(ou=ou12))")
895             self.fail("Should have failed on missing base")
896         except ldb.LdbError as err:
897             enum = err.args[0]
898             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
899
900     def test_subtree_and(self):
901         """Testing a search"""
902
903         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
904                               scope=ldb.SCOPE_SUBTREE,
905                               expression="(&(ou=ou11)(ou=ou12))")
906         self.assertEqual(len(res11), 0)
907
908     def test_subtree_and2(self):
909         """Testing a search"""
910
911         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
912                               scope=ldb.SCOPE_SUBTREE,
913                               expression="(&(x=y)(|(y=b)(y=c)))")
914         self.assertEqual(len(res11), 1)
915
916     def test_subtree_and2_lower(self):
917         """Testing a search"""
918
919         res11 = self.l.search(base="DC=samba,DC=org",
920                               scope=ldb.SCOPE_SUBTREE,
921                               expression="(&(x=y)(|(y=b)(y=c)))")
922         self.assertEqual(len(res11), 1)
923
924     def test_subtree_or(self):
925         """Testing a search"""
926
927         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
928                               scope=ldb.SCOPE_SUBTREE,
929                               expression="(|(ou=ou11)(ou=ou12))")
930         self.assertEqual(len(res11), 2)
931
932     def test_subtree_or2(self):
933         """Testing a search"""
934
935         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
936                               scope=ldb.SCOPE_SUBTREE,
937                               expression="(|(x=y)(y=b))")
938         self.assertEqual(len(res11), 20)
939
940     def test_subtree_or3(self):
941         """Testing a search"""
942
943         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
944                               scope=ldb.SCOPE_SUBTREE,
945                               expression="(|(x=y)(y=b)(y=c))")
946         self.assertEqual(len(res11), 22)
947
948     def test_one_and(self):
949         """Testing a search"""
950
951         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
952                               scope=ldb.SCOPE_ONELEVEL,
953                               expression="(&(ou=ou11)(ou=ou12))")
954         self.assertEqual(len(res11), 0)
955
956     def test_one_and2(self):
957         """Testing a search"""
958
959         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
960                               scope=ldb.SCOPE_ONELEVEL,
961                               expression="(&(x=y)(y=b))")
962         self.assertEqual(len(res11), 1)
963
964     def test_one_or(self):
965         """Testing a search"""
966
967         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
968                               scope=ldb.SCOPE_ONELEVEL,
969                               expression="(|(ou=ou11)(ou=ou12))")
970         self.assertEqual(len(res11), 2)
971
972     def test_one_or2(self):
973         """Testing a search"""
974
975         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
976                               scope=ldb.SCOPE_ONELEVEL,
977                               expression="(|(x=y)(y=b))")
978         self.assertEqual(len(res11), 20)
979
980     def test_one_or2_lower(self):
981         """Testing a search"""
982
983         res11 = self.l.search(base="DC=samba,DC=org",
984                               scope=ldb.SCOPE_ONELEVEL,
985                               expression="(|(x=y)(y=b))")
986         self.assertEqual(len(res11), 20)
987
988     def test_one_unindexable(self):
989         """Testing a search"""
990
991         try:
992             res11 = self.l.search(base="DC=samba,DC=org",
993                                   scope=ldb.SCOPE_ONELEVEL,
994                                   expression="(y=b*)")
995             if hasattr(self, 'IDX') and \
996                not hasattr(self, 'IDXONE') and \
997                hasattr(self, 'IDXCHECK'):
998                 self.fail("Should have failed as un-indexed search")
999
1000             self.assertEqual(len(res11), 9)
1001
1002         except ldb.LdbError as err:
1003             enum = err.args[0]
1004             estr = err.args[1]
1005             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1006             self.assertIn(estr, "ldb FULL SEARCH disabled")
1007
1008     def test_one_unindexable_presence(self):
1009         """Testing a search"""
1010
1011         try:
1012             res11 = self.l.search(base="DC=samba,DC=org",
1013                                   scope=ldb.SCOPE_ONELEVEL,
1014                                   expression="(y=*)")
1015             if hasattr(self, 'IDX') and \
1016                not hasattr(self, 'IDXONE') and \
1017                hasattr(self, 'IDXCHECK'):
1018                 self.fail("Should have failed as un-indexed search")
1019
1020             self.assertEqual(len(res11), 24)
1021
1022         except ldb.LdbError as err:
1023             enum = err.args[0]
1024             estr = err.args[1]
1025             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1026             self.assertIn(estr, "ldb FULL SEARCH disabled")
1027
1028
1029     def test_subtree_and_or(self):
1030         """Testing a search"""
1031
1032         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1033                               scope=ldb.SCOPE_SUBTREE,
1034                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1035         self.assertEqual(len(res11), 0)
1036
1037     def test_subtree_and_or2(self):
1038         """Testing a search"""
1039
1040         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1041                               scope=ldb.SCOPE_SUBTREE,
1042                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1043         self.assertEqual(len(res11), 0)
1044
1045     def test_subtree_and_or3(self):
1046         """Testing a search"""
1047
1048         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1049                               scope=ldb.SCOPE_SUBTREE,
1050                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1051         self.assertEqual(len(res11), 2)
1052
1053     def test_subtree_and_or4(self):
1054         """Testing a search"""
1055
1056         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1057                               scope=ldb.SCOPE_SUBTREE,
1058                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1059         self.assertEqual(len(res11), 2)
1060
1061     def test_subtree_and_or5(self):
1062         """Testing a search"""
1063
1064         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1065                               scope=ldb.SCOPE_SUBTREE,
1066                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1067         self.assertEqual(len(res11), 1)
1068
1069     def test_subtree_or_and(self):
1070         """Testing a search"""
1071
1072         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1073                               scope=ldb.SCOPE_SUBTREE,
1074                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1075         self.assertEqual(len(res11), 10)
1076
1077     def test_subtree_large_and_unique(self):
1078         """Testing a search"""
1079
1080         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1081                               scope=ldb.SCOPE_SUBTREE,
1082                               expression="(&(ou=ou10)(y=a))")
1083         self.assertEqual(len(res11), 1)
1084
1085     def test_subtree_and_none(self):
1086         """Testing a search"""
1087
1088         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1089                               scope=ldb.SCOPE_SUBTREE,
1090                               expression="(&(ou=ouX)(y=a))")
1091         self.assertEqual(len(res11), 0)
1092
1093     def test_subtree_and_idx_record(self):
1094         """Testing a search against the index record"""
1095
1096         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1097                               scope=ldb.SCOPE_SUBTREE,
1098                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1099         self.assertEqual(len(res11), 0)
1100
1101     def test_subtree_and_idxone_record(self):
1102         """Testing a search against the index record"""
1103
1104         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1105                               scope=ldb.SCOPE_SUBTREE,
1106                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1107         self.assertEqual(len(res11), 0)
1108
1109     def test_subtree_unindexable(self):
1110         """Testing a search"""
1111
1112         try:
1113             res11 = self.l.search(base="DC=samba,DC=org",
1114                                   scope=ldb.SCOPE_SUBTREE,
1115                                   expression="(y=b*)")
1116             if hasattr(self, 'IDX') and \
1117                hasattr(self, 'IDXCHECK'):
1118                 self.fail("Should have failed as un-indexed search")
1119
1120             self.assertEqual(len(res11), 9)
1121
1122         except ldb.LdbError as err:
1123             enum = err.args[0]
1124             estr = err.args[1]
1125             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1126             self.assertIn(estr, "ldb FULL SEARCH disabled")
1127
1128     def test_subtree_unindexable_presence(self):
1129         """Testing a search"""
1130
1131         try:
1132             res11 = self.l.search(base="DC=samba,DC=org",
1133                                   scope=ldb.SCOPE_SUBTREE,
1134                                   expression="(y=*)")
1135             if hasattr(self, 'IDX') and \
1136                hasattr(self, 'IDXCHECK'):
1137                 self.fail("Should have failed as un-indexed search")
1138
1139             self.assertEqual(len(res11), 24)
1140
1141         except ldb.LdbError as err:
1142             enum = err.args[0]
1143             estr = err.args[1]
1144             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1145             self.assertIn(estr, "ldb FULL SEARCH disabled")
1146
1147
1148     def test_dn_filter_one(self):
1149         """Testing that a dn= filter succeeds
1150         (or fails with disallowDNFilter
1151         set and IDXGUID or (IDX and not IDXONE) mode)
1152         when the scope is SCOPE_ONELEVEL.
1153
1154         This should be made more consistent, but for now lock in
1155         the behaviour
1156
1157         """
1158
1159         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1160                               scope=ldb.SCOPE_ONELEVEL,
1161                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1162         if hasattr(self, 'disallowDNFilter') and \
1163            hasattr(self, 'IDX') and \
1164            (hasattr(self, 'IDXGUID') or \
1165             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1166             self.assertEqual(len(res11), 0)
1167         else:
1168             self.assertEqual(len(res11), 1)
1169
1170     def test_dn_filter_subtree(self):
1171         """Testing that a dn= filter succeeds
1172         (or fails with disallowDNFilter set)
1173         when the scope is SCOPE_SUBTREE"""
1174
1175         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1176                               scope=ldb.SCOPE_SUBTREE,
1177                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1178         if hasattr(self, 'disallowDNFilter') \
1179            and hasattr(self, 'IDX'):
1180             self.assertEqual(len(res11), 0)
1181         else:
1182             self.assertEqual(len(res11), 1)
1183
1184     def test_dn_filter_base(self):
1185         """Testing that (incorrectly) a dn= filter works
1186         when the scope is SCOPE_BASE"""
1187
1188         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1189                               scope=ldb.SCOPE_BASE,
1190                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1191
1192         # At some point we should fix this, but it isn't trivial
1193         self.assertEqual(len(res11), 1)
1194
1195
1196 # Run the search tests against an lmdb backend
1197 class SearchTestsLmdb(SearchTests):
1198
1199     def setUp(self):
1200         self.prefix = MDB_PREFIX
1201         self.index = MDB_INDEX_OBJ
1202         super(SearchTestsLmdb, self).setUp()
1203
1204     def tearDown(self):
1205         super(SearchTestsLmdb, self).tearDown()
1206
1207
1208 class IndexedSearchTests(SearchTests):
1209     """Test searches using the index, to ensure the index doesn't
1210        break things"""
1211     def setUp(self):
1212         super(IndexedSearchTests, self).setUp()
1213         self.l.add({"dn": "@INDEXLIST",
1214                     "@IDXATTR": [b"x", b"y", b"ou"]})
1215         self.IDX = True
1216
1217 class IndexedCheckSearchTests(IndexedSearchTests):
1218     """Test searches using the index, to ensure the index doesn't
1219        break things (full scan disabled)"""
1220     def setUp(self):
1221         self.IDXCHECK = True
1222         super(IndexedCheckSearchTests, self).setUp()
1223
1224 class IndexedSearchDnFilterTests(SearchTests):
1225     """Test searches using the index, to ensure the index doesn't
1226        break things"""
1227     def setUp(self):
1228         super(IndexedSearchDnFilterTests, self).setUp()
1229         self.l.add({"dn": "@OPTIONS",
1230                     "disallowDNFilter": "TRUE"})
1231         self.disallowDNFilter = True
1232
1233         self.l.add({"dn": "@INDEXLIST",
1234                     "@IDXATTR": [b"x", b"y", b"ou"]})
1235         self.IDX = True
1236
1237 class IndexedAndOneLevelSearchTests(SearchTests):
1238     """Test searches using the index including @IDXONE, to ensure
1239        the index doesn't break things"""
1240     def setUp(self):
1241         super(IndexedAndOneLevelSearchTests, self).setUp()
1242         self.l.add({"dn": "@INDEXLIST",
1243                     "@IDXATTR": [b"x", b"y", b"ou"],
1244                     "@IDXONE": [b"1"]})
1245         self.IDX = True
1246         self.IDXONE = True
1247
1248 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1249     """Test searches using the index including @IDXONE, to ensure
1250        the index doesn't break things (full scan disabled)"""
1251     def setUp(self):
1252         self.IDXCHECK = True
1253         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1254
1255 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1256     """Test searches using the index including @IDXONE, to ensure
1257        the index doesn't break things"""
1258     def setUp(self):
1259         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1260         self.l.add({"dn": "@OPTIONS",
1261                     "disallowDNFilter": "TRUE"})
1262         self.disallowDNFilter = True
1263
1264         self.l.add({"dn": "@INDEXLIST",
1265                     "@IDXATTR": [b"x", b"y", b"ou"],
1266                     "@IDXONE": [b"1"]})
1267         self.IDX = True
1268         self.IDXONE = True
1269
1270 class GUIDIndexedSearchTests(SearchTests):
1271     """Test searches using the index, to ensure the index doesn't
1272        break things"""
1273     def setUp(self):
1274         self.index = {"dn": "@INDEXLIST",
1275                       "@IDXATTR": [b"x", b"y", b"ou"],
1276                       "@IDXGUID": [b"objectUUID"],
1277                       "@IDX_DN_GUID": [b"GUID"]}
1278         super(GUIDIndexedSearchTests, self).setUp()
1279
1280         self.IDXGUID = True
1281         self.IDXONE = True
1282
1283
1284 class GUIDIndexedDNFilterSearchTests(SearchTests):
1285     """Test searches using the index, to ensure the index doesn't
1286        break things"""
1287     def setUp(self):
1288         self.index = {"dn": "@INDEXLIST",
1289                       "@IDXATTR": [b"x", b"y", b"ou"],
1290                       "@IDXGUID": [b"objectUUID"],
1291                       "@IDX_DN_GUID": [b"GUID"]}
1292         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1293         self.l.add({"dn": "@OPTIONS",
1294                     "disallowDNFilter": "TRUE"})
1295         self.disallowDNFilter = True
1296         self.IDX = True
1297         self.IDXGUID = True
1298
1299 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1300     """Test searches using the index including @IDXONE, to ensure
1301        the index doesn't break things"""
1302     def setUp(self):
1303         self.index = {"dn": "@INDEXLIST",
1304                       "@IDXATTR": [b"x", b"y", b"ou"],
1305                       "@IDXGUID": [b"objectUUID"],
1306                       "@IDX_DN_GUID": [b"GUID"]}
1307         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1308         self.l.add({"dn": "@OPTIONS",
1309                     "disallowDNFilter": "TRUE"})
1310         self.disallowDNFilter = True
1311         self.IDX = True
1312         self.IDXGUID = True
1313         self.IDXONE = True
1314
1315 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1316
1317     def setUp(self):
1318         self.prefix = MDB_PREFIX
1319         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1320
1321     def tearDown(self):
1322         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1323
1324
1325 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1326
1327     def setUp(self):
1328         self.prefix = MDB_PREFIX
1329         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1330
1331     def tearDown(self):
1332         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1333
1334
1335 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1336
1337     def setUp(self):
1338         self.prefix = MDB_PREFIX
1339         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1340
1341     def tearDown(self):
1342         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1343
1344
1345 class AddModifyTests(LdbBaseTest):
1346     def tearDown(self):
1347         shutil.rmtree(self.testdir)
1348         super(AddModifyTests, self).tearDown()
1349
1350         # Ensure the LDB is closed now, so we close the FD
1351         del(self.l)
1352
1353     def setUp(self):
1354         super(AddModifyTests, self).setUp()
1355         self.testdir = tempdir()
1356         self.filename = os.path.join(self.testdir, "add_test.ldb")
1357         self.l = ldb.Ldb(self.url(),
1358                          flags=self.flags(),
1359                          options=["modules:rdn_name"])
1360         try:
1361             self.l.add(self.index)
1362         except AttributeError:
1363             pass
1364
1365         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1366                     "name": b"samba.org",
1367                     "objectUUID": b"0123456789abcdef"})
1368         self.l.add({"dn": "@ATTRIBUTES",
1369                     "objectUUID": "UNIQUE_INDEX"})
1370
1371     def test_add_dup(self):
1372         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1373                     "name": b"Admins",
1374                     "x": "z", "y": "a",
1375                     "objectUUID": b"0123456789abcde1"})
1376         try:
1377             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1378                         "name": b"Admins",
1379                         "x": "z", "y": "a",
1380                         "objectUUID": b"0123456789abcde2"})
1381             self.fail("Should have failed adding dupliate entry")
1382         except ldb.LdbError as err:
1383             enum = err.args[0]
1384             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1385
1386     def test_add_del_add(self):
1387         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1388                     "name": b"Admins",
1389                     "x": "z", "y": "a",
1390                     "objectUUID": b"0123456789abcde1"})
1391         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1392         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1393                     "name": b"Admins",
1394                     "x": "z", "y": "a",
1395                     "objectUUID": b"0123456789abcde2"})
1396
1397     def test_add_move_add(self):
1398         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1399                     "name": b"Admins",
1400                     "x": "z", "y": "a",
1401                     "objectUUID": b"0123456789abcde1"})
1402         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1403                       "OU=DUP2,DC=SAMBA,DC=ORG")
1404         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1405                     "name": b"Admins",
1406                     "x": "z", "y": "a",
1407                     "objectUUID": b"0123456789abcde2"})
1408
1409     def test_add_move_fail_move_move(self):
1410         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1411                     "name": b"Admins",
1412                     "x": "z", "y": "a",
1413                     "objectUUID": b"0123456789abcde1"})
1414         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1415                     "name": b"Admins",
1416                     "x": "z", "y": "a",
1417                     "objectUUID": b"0123456789abcde2"})
1418
1419         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1420                              scope=ldb.SCOPE_SUBTREE,
1421                              expression="(objectUUID=0123456789abcde1)")
1422         self.assertEqual(len(res2), 1)
1423         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1424
1425         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1426                              scope=ldb.SCOPE_SUBTREE,
1427                              expression="(objectUUID=0123456789abcde2)")
1428         self.assertEqual(len(res3), 1)
1429         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1430
1431         try:
1432             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1433                           "OU=DUP2,DC=SAMBA,DC=ORG")
1434             self.fail("Should have failed on duplicate DN")
1435         except ldb.LdbError as err:
1436             enum = err.args[0]
1437             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1438
1439         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1440                       "OU=DUP3,DC=SAMBA,DC=ORG")
1441
1442         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1443                       "OU=DUP2,DC=SAMBA,DC=ORG")
1444
1445         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1446                              scope=ldb.SCOPE_SUBTREE,
1447                              expression="(objectUUID=0123456789abcde1)")
1448         self.assertEqual(len(res2), 1)
1449         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1450
1451         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1452                              scope=ldb.SCOPE_SUBTREE,
1453                              expression="(objectUUID=0123456789abcde2)")
1454         self.assertEqual(len(res3), 1)
1455         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1456
1457     def test_move_missing(self):
1458         try:
1459             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1460                           "OU=DUP2,DC=SAMBA,DC=ORG")
1461             self.fail("Should have failed on missing")
1462         except ldb.LdbError as err:
1463             enum = err.args[0]
1464             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1465
1466     def test_move_missing2(self):
1467         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1468                     "name": b"Admins",
1469                     "x": "z", "y": "a",
1470                     "objectUUID": b"0123456789abcde2"})
1471
1472         try:
1473             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1474                           "OU=DUP2,DC=SAMBA,DC=ORG")
1475             self.fail("Should have failed on missing")
1476         except ldb.LdbError as err:
1477             enum = err.args[0]
1478             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1479
1480     def test_move_fail_move_add(self):
1481         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1482                     "name": b"Admins",
1483                     "x": "z", "y": "a",
1484                     "objectUUID": b"0123456789abcde1"})
1485         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1486                     "name": b"Admins",
1487                     "x": "z", "y": "a",
1488                     "objectUUID": b"0123456789abcde2"})
1489         try:
1490             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1491                           "OU=DUP2,DC=SAMBA,DC=ORG")
1492             self.fail("Should have failed on duplicate DN")
1493         except ldb.LdbError as err:
1494             enum = err.args[0]
1495             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1496
1497         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1498                       "OU=DUP3,DC=SAMBA,DC=ORG")
1499
1500         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1501                     "name": b"Admins",
1502                     "x": "z", "y": "a",
1503                     "objectUUID": b"0123456789abcde3"})
1504
1505
1506 class AddModifyTestsLmdb(AddModifyTests):
1507
1508     def setUp(self):
1509         self.prefix = MDB_PREFIX
1510         self.index = MDB_INDEX_OBJ
1511         super(AddModifyTestsLmdb, self).setUp()
1512
1513     def tearDown(self):
1514         super(AddModifyTestsLmdb, self).tearDown()
1515
1516 class IndexedAddModifyTests(AddModifyTests):
1517     """Test searches using the index, to ensure the index doesn't
1518        break things"""
1519     def setUp(self):
1520         if not hasattr(self, 'index'):
1521             self.index = {"dn": "@INDEXLIST",
1522                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1523                           "@IDXONE": [b"1"]}
1524         super(IndexedAddModifyTests, self).setUp()
1525
1526     def test_duplicate_GUID(self):
1527         try:
1528             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1529                         "name": b"Admins",
1530                         "x": "z", "y": "a",
1531                         "objectUUID": b"0123456789abcdef"})
1532             self.fail("Should have failed adding dupliate GUID")
1533         except ldb.LdbError as err:
1534             enum = err.args[0]
1535             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1536
1537     def test_duplicate_name_dup_GUID(self):
1538         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1539                     "name": b"Admins",
1540                     "x": "z", "y": "a",
1541                     "objectUUID": b"a123456789abcdef"})
1542         try:
1543             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1544                         "name": b"Admins",
1545                         "x": "z", "y": "a",
1546                         "objectUUID": b"a123456789abcdef"})
1547             self.fail("Should have failed adding dupliate GUID")
1548         except ldb.LdbError as err:
1549             enum = err.args[0]
1550             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1551
1552     def test_duplicate_name_dup_GUID2(self):
1553         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1554                     "name": b"Admins",
1555                     "x": "z", "y": "a",
1556                     "objectUUID": b"abc3456789abcdef"})
1557         try:
1558             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1559                         "name": b"Admins",
1560                         "x": "z", "y": "a",
1561                         "objectUUID": b"aaa3456789abcdef"})
1562             self.fail("Should have failed adding dupliate DN")
1563         except ldb.LdbError as err:
1564             enum = err.args[0]
1565             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1566
1567         # Checking the GUID didn't stick in the index
1568         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1569                     "name": b"Admins",
1570                     "x": "z", "y": "a",
1571                     "objectUUID": b"aaa3456789abcdef"})
1572
1573     def test_add_dup_guid_add(self):
1574         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1575                     "name": b"Admins",
1576                     "x": "z", "y": "a",
1577                     "objectUUID": b"0123456789abcde1"})
1578         try:
1579             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1580                         "name": b"Admins",
1581                         "x": "z", "y": "a",
1582                         "objectUUID": b"0123456789abcde1"})
1583             self.fail("Should have failed on duplicate GUID")
1584
1585         except ldb.LdbError as err:
1586             enum = err.args[0]
1587             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1588
1589         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1590                     "name": b"Admins",
1591                     "x": "z", "y": "a",
1592                     "objectUUID": b"0123456789abcde2"})
1593
1594 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1595     """Test searches using the index, to ensure the index doesn't
1596        break things"""
1597     def setUp(self):
1598         self.index = {"dn": "@INDEXLIST",
1599                       "@IDXATTR": [b"x", b"y", b"ou"],
1600                       "@IDXONE": [b"1"],
1601                       "@IDXGUID": [b"objectUUID"],
1602                       "@IDX_DN_GUID": [b"GUID"]}
1603         super(GUIDIndexedAddModifyTests, self).setUp()
1604
1605
1606 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1607     """Test GUID index behaviour insdie the transaction"""
1608     def setUp(self):
1609         super(GUIDTransIndexedAddModifyTests, self).setUp()
1610         self.l.transaction_start()
1611
1612     def tearDown(self):
1613         self.l.transaction_commit()
1614         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1615
1616 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1617     """Test index behaviour insdie the transaction"""
1618     def setUp(self):
1619         super(TransIndexedAddModifyTests, self).setUp()
1620         self.l.transaction_start()
1621
1622     def tearDown(self):
1623         self.l.transaction_commit()
1624         super(TransIndexedAddModifyTests, self).tearDown()
1625
1626 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1627
1628     def setUp(self):
1629         self.prefix = MDB_PREFIX
1630         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1631
1632     def tearDown(self):
1633         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1634
1635 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1636
1637     def setUp(self):
1638         self.prefix = MDB_PREFIX
1639         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1640
1641     def tearDown(self):
1642         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1643
1644 class BadIndexTests(LdbBaseTest):
1645     def setUp(self):
1646         super(BadIndexTests, self).setUp()
1647         self.testdir = tempdir()
1648         self.filename = os.path.join(self.testdir, "test.ldb")
1649         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1650         if hasattr(self, 'IDXGUID'):
1651             self.ldb.add({"dn": "@INDEXLIST",
1652                           "@IDXATTR": [b"x", b"y", b"ou"],
1653                           "@IDXGUID": [b"objectUUID"],
1654                           "@IDX_DN_GUID": [b"GUID"]})
1655         else:
1656             self.ldb.add({"dn": "@INDEXLIST",
1657                           "@IDXATTR": [b"x", b"y", b"ou"]})
1658
1659         super(BadIndexTests, self).setUp()
1660
1661     def test_unique(self):
1662         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1663                       "objectUUID": b"0123456789abcde1",
1664                       "y": "1"})
1665         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1666                       "objectUUID": b"0123456789abcde2",
1667                       "y": "1"})
1668         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1669                       "objectUUID": b"0123456789abcde3",
1670                       "y": "1"})
1671
1672         res = self.ldb.search(expression="(y=1)",
1673                               base="dc=samba,dc=org")
1674         self.assertEquals(len(res), 3)
1675
1676         # Now set this to unique index, but forget to check the result
1677         try:
1678             self.ldb.add({"dn": "@ATTRIBUTES",
1679                         "y": "UNIQUE_INDEX"})
1680             self.fail()
1681         except ldb.LdbError:
1682             pass
1683
1684         # We must still have a working index
1685         res = self.ldb.search(expression="(y=1)",
1686                               base="dc=samba,dc=org")
1687         self.assertEquals(len(res), 3)
1688
1689     def test_unique_transaction(self):
1690         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1691                       "objectUUID": b"0123456789abcde1",
1692                       "y": "1"})
1693         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1694                       "objectUUID": b"0123456789abcde2",
1695                       "y": "1"})
1696         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1697                       "objectUUID": b"0123456789abcde3",
1698                       "y": "1"})
1699
1700         res = self.ldb.search(expression="(y=1)",
1701                               base="dc=samba,dc=org")
1702         self.assertEquals(len(res), 3)
1703
1704         self.ldb.transaction_start()
1705
1706         # Now set this to unique index, but forget to check the result
1707         try:
1708             self.ldb.add({"dn": "@ATTRIBUTES",
1709                         "y": "UNIQUE_INDEX"})
1710         except ldb.LdbError:
1711             pass
1712
1713         try:
1714             self.ldb.transaction_commit()
1715             self.fail()
1716
1717         except ldb.LdbError as err:
1718             enum = err.args[0]
1719             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1720
1721         # We must still have a working index
1722         res = self.ldb.search(expression="(y=1)",
1723                               base="dc=samba,dc=org")
1724
1725         self.assertEquals(len(res), 3)
1726
1727     def test_casefold(self):
1728         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1729                       "objectUUID": b"0123456789abcde1",
1730                       "y": "a"})
1731         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1732                       "objectUUID": b"0123456789abcde2",
1733                       "y": "A"})
1734         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1735                       "objectUUID": b"0123456789abcde3",
1736                       "y": ["a", "A"]})
1737
1738         res = self.ldb.search(expression="(y=a)",
1739                               base="dc=samba,dc=org")
1740         self.assertEquals(len(res), 2)
1741
1742         self.ldb.add({"dn": "@ATTRIBUTES",
1743                       "y": "CASE_INSENSITIVE"})
1744
1745         # We must still have a working index
1746         res = self.ldb.search(expression="(y=a)",
1747                               base="dc=samba,dc=org")
1748
1749         if hasattr(self, 'IDXGUID'):
1750             self.assertEquals(len(res), 3)
1751         else:
1752             # We should not return this entry twice, but sadly
1753             # we have not yet fixed
1754             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1755             self.assertEquals(len(res), 4)
1756
1757     def test_casefold_transaction(self):
1758         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1759                       "objectUUID": b"0123456789abcde1",
1760                       "y": "a"})
1761         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1762                       "objectUUID": b"0123456789abcde2",
1763                       "y": "A"})
1764         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1765                       "objectUUID": b"0123456789abcde3",
1766                       "y": ["a", "A"]})
1767
1768         res = self.ldb.search(expression="(y=a)",
1769                               base="dc=samba,dc=org")
1770         self.assertEquals(len(res), 2)
1771
1772         self.ldb.transaction_start()
1773
1774         self.ldb.add({"dn": "@ATTRIBUTES",
1775                       "y": "CASE_INSENSITIVE"})
1776
1777         self.ldb.transaction_commit()
1778
1779         # We must still have a working index
1780         res = self.ldb.search(expression="(y=a)",
1781                               base="dc=samba,dc=org")
1782
1783         if hasattr(self, 'IDXGUID'):
1784             self.assertEquals(len(res), 3)
1785         else:
1786             # We should not return this entry twice, but sadly
1787             # we have not yet fixed
1788             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1789             self.assertEquals(len(res), 4)
1790
1791
1792     def tearDown(self):
1793         super(BadIndexTests, self).tearDown()
1794
1795
1796 class GUIDBadIndexTests(BadIndexTests):
1797     """Test Bad index things with GUID index mode"""
1798     def setUp(self):
1799         self.IDXGUID = True
1800
1801         super(GUIDBadIndexTests, self).setUp()
1802
1803 class DnTests(TestCase):
1804
1805     def setUp(self):
1806         super(DnTests, self).setUp()
1807         self.ldb = ldb.Ldb()
1808
1809     def tearDown(self):
1810         super(DnTests, self).tearDown()
1811         del(self.ldb)
1812
1813     def test_set_dn_invalid(self):
1814         x = ldb.Message()
1815         def assign():
1816             x.dn = "astring"
1817         self.assertRaises(TypeError, assign)
1818
1819     def test_eq(self):
1820         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1821         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1822         self.assertEqual(x, y)
1823         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1824         self.assertNotEqual(x, y)
1825
1826     def test_str(self):
1827         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1828         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1829
1830     def test_repr(self):
1831         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1832         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1833
1834     def test_get_casefold(self):
1835         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1836         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1837
1838     def test_validate(self):
1839         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1840         self.assertTrue(x.validate())
1841
1842     def test_parent(self):
1843         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1844         self.assertEqual("bar=bloe", x.parent().__str__())
1845
1846     def test_parent_nonexistent(self):
1847         x = ldb.Dn(self.ldb, "@BLA")
1848         self.assertEqual(None, x.parent())
1849
1850     def test_is_valid(self):
1851         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1852         self.assertTrue(x.is_valid())
1853         x = ldb.Dn(self.ldb, "")
1854         self.assertTrue(x.is_valid())
1855
1856     def test_is_special(self):
1857         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1858         self.assertFalse(x.is_special())
1859         x = ldb.Dn(self.ldb, "@FOOBAR")
1860         self.assertTrue(x.is_special())
1861
1862     def test_check_special(self):
1863         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1864         self.assertFalse(x.check_special("FOOBAR"))
1865         x = ldb.Dn(self.ldb, "@FOOBAR")
1866         self.assertTrue(x.check_special("@FOOBAR"))
1867
1868     def test_len(self):
1869         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1870         self.assertEqual(2, len(x))
1871         x = ldb.Dn(self.ldb, "dc=foo21")
1872         self.assertEqual(1, len(x))
1873
1874     def test_add_child(self):
1875         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1876         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1877         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1878
1879     def test_add_base(self):
1880         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1881         base = ldb.Dn(self.ldb, "bla=bloe")
1882         self.assertTrue(x.add_base(base))
1883         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1884
1885     def test_add_child_str(self):
1886         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1887         self.assertTrue(x.add_child("bla=bloe"))
1888         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1889
1890     def test_add_base_str(self):
1891         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1892         base = "bla=bloe"
1893         self.assertTrue(x.add_base(base))
1894         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1895
1896     def test_add(self):
1897         x = ldb.Dn(self.ldb, "dc=foo24")
1898         y = ldb.Dn(self.ldb, "bar=bla")
1899         self.assertEqual("dc=foo24,bar=bla", str(x + y))
1900
1901     def test_remove_base_components(self):
1902         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1903         x.remove_base_components(len(x)-1)
1904         self.assertEqual("dc=foo24", str(x))
1905
1906     def test_parse_ldif(self):
1907         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1908         msg = next(msgs)
1909         self.assertEqual("foo=bar", str(msg[1].dn))
1910         self.assertTrue(isinstance(msg[1], ldb.Message))
1911         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1912         self.assertEqual("dn: foo=bar\n\n", ldif)
1913
1914     def test_parse_ldif_more(self):
1915         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1916         msg = next(msgs)
1917         self.assertEqual("foo=bar", str(msg[1].dn))
1918         msg = next(msgs)
1919         self.assertEqual("bar=bar", str(msg[1].dn))
1920
1921     def test_canonical_string(self):
1922         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1923         self.assertEqual("/bloe/foo25", x.canonical_str())
1924
1925     def test_canonical_ex_string(self):
1926         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1927         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1928
1929     def test_ldb_is_child_of(self):
1930         """Testing ldb_dn_compare_dn"""
1931         dn1 = ldb.Dn(self.ldb, "dc=base")
1932         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1933         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1934         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1935
1936         self.assertTrue(dn1.is_child_of(dn1))
1937         self.assertTrue(dn2.is_child_of(dn1))
1938         self.assertTrue(dn4.is_child_of(dn1))
1939         self.assertTrue(dn4.is_child_of(dn3))
1940         self.assertTrue(dn4.is_child_of(dn4))
1941         self.assertFalse(dn3.is_child_of(dn2))
1942         self.assertFalse(dn1.is_child_of(dn4))
1943
1944     def test_ldb_is_child_of_str(self):
1945         """Testing ldb_dn_compare_dn"""
1946         dn1_str = "dc=base"
1947         dn2_str = "cn=foo,dc=base"
1948         dn3_str = "cn=bar,dc=base"
1949         dn4_str = "cn=baz,cn=bar,dc=base"
1950
1951         dn1 = ldb.Dn(self.ldb, dn1_str)
1952         dn2 = ldb.Dn(self.ldb, dn2_str)
1953         dn3 = ldb.Dn(self.ldb, dn3_str)
1954         dn4 = ldb.Dn(self.ldb, dn4_str)
1955
1956         self.assertTrue(dn1.is_child_of(dn1_str))
1957         self.assertTrue(dn2.is_child_of(dn1_str))
1958         self.assertTrue(dn4.is_child_of(dn1_str))
1959         self.assertTrue(dn4.is_child_of(dn3_str))
1960         self.assertTrue(dn4.is_child_of(dn4_str))
1961         self.assertFalse(dn3.is_child_of(dn2_str))
1962         self.assertFalse(dn1.is_child_of(dn4_str))
1963
1964     def test_get_component_name(self):
1965         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1966         self.assertEqual(dn.get_component_name(0), 'cn')
1967         self.assertEqual(dn.get_component_name(1), 'dc')
1968         self.assertEqual(dn.get_component_name(2), None)
1969         self.assertEqual(dn.get_component_name(-1), None)
1970
1971     def test_get_component_value(self):
1972         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1973         self.assertEqual(dn.get_component_value(0), 'foo')
1974         self.assertEqual(dn.get_component_value(1), 'base')
1975         self.assertEqual(dn.get_component_name(2), None)
1976         self.assertEqual(dn.get_component_name(-1), None)
1977
1978     def test_set_component(self):
1979         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1980         dn.set_component(0, 'cn', 'bar')
1981         self.assertEqual(str(dn), "cn=bar,dc=base")
1982         dn.set_component(1, 'o', 'asep')
1983         self.assertEqual(str(dn), "cn=bar,o=asep")
1984         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1985         self.assertEqual(str(dn), "cn=bar,o=asep")
1986         dn.set_component(1, 'o', 'a,b+c')
1987         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1988
1989     def test_set_component_bytes(self):
1990         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1991         dn.set_component(0, 'cn', b'bar')
1992         self.assertEqual(str(dn), "cn=bar,dc=base")
1993         dn.set_component(1, 'o', b'asep')
1994         self.assertEqual(str(dn), "cn=bar,o=asep")
1995
1996     def test_set_component_none(self):
1997         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1998         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1999
2000     def test_get_extended_component_null(self):
2001         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2002         self.assertEqual(dn.get_extended_component("TEST"), None)
2003
2004     def test_get_extended_component(self):
2005         self.ldb._register_test_extensions()
2006         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2007         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2008
2009     def test_set_extended_component(self):
2010         self.ldb._register_test_extensions()
2011         dn = ldb.Dn(self.ldb, "dc=base")
2012         dn.set_extended_component("TEST", "foo")
2013         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2014         dn.set_extended_component("TEST", b"bar")
2015         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2016
2017     def test_extended_str(self):
2018         self.ldb._register_test_extensions()
2019         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2020         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2021
2022     def test_get_rdn_name(self):
2023         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2024         self.assertEqual(dn.get_rdn_name(), 'cn')
2025
2026     def test_get_rdn_value(self):
2027         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2028         self.assertEqual(dn.get_rdn_value(), 'foo')
2029
2030     def test_get_casefold(self):
2031         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2032         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2033
2034     def test_get_linearized(self):
2035         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2036         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2037
2038     def test_is_null(self):
2039         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2040         self.assertFalse(dn.is_null())
2041
2042         dn = ldb.Dn(self.ldb, '')
2043         self.assertTrue(dn.is_null())
2044
2045 class LdbMsgTests(TestCase):
2046
2047     def setUp(self):
2048         super(LdbMsgTests, self).setUp()
2049         self.msg = ldb.Message()
2050
2051     def test_init_dn(self):
2052         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2053         self.assertEqual("dc=foo27", str(self.msg.dn))
2054
2055     def test_iter_items(self):
2056         self.assertEqual(0, len(self.msg.items()))
2057         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2058         self.assertEqual(1, len(self.msg.items()))
2059
2060     def test_repr(self):
2061         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2062         self.msg["dc"] = b"foo"
2063         if PY3:
2064             self.assertIn(repr(self.msg), [
2065                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2066                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2067             ])
2068             self.assertIn(repr(self.msg.text), [
2069                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2070                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2071             ])
2072         else:
2073             self.assertEqual(
2074                 repr(self.msg),
2075                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
2076             self.assertEqual(
2077                 repr(self.msg.text),
2078                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
2079
2080     def test_len(self):
2081         self.assertEqual(0, len(self.msg))
2082
2083     def test_notpresent(self):
2084         self.assertRaises(KeyError, lambda: self.msg["foo"])
2085
2086     def test_del(self):
2087         del self.msg["foo"]
2088
2089     def test_add(self):
2090         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2091
2092     def test_add_text(self):
2093         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2094
2095     def test_elements_empty(self):
2096         self.assertEqual([], self.msg.elements())
2097
2098     def test_elements(self):
2099         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2100         self.msg.add(el)
2101         self.assertEqual([el], self.msg.elements())
2102         self.assertEqual([el.text], self.msg.text.elements())
2103
2104     def test_add_value(self):
2105         self.assertEqual(0, len(self.msg))
2106         self.msg["foo"] = [b"foo"]
2107         self.assertEqual(1, len(self.msg))
2108
2109     def test_add_value_text(self):
2110         self.assertEqual(0, len(self.msg))
2111         self.msg["foo"] = ["foo"]
2112         self.assertEqual(1, len(self.msg))
2113
2114     def test_add_value_multiple(self):
2115         self.assertEqual(0, len(self.msg))
2116         self.msg["foo"] = [b"foo", b"bla"]
2117         self.assertEqual(1, len(self.msg))
2118         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2119
2120     def test_add_value_multiple_text(self):
2121         self.assertEqual(0, len(self.msg))
2122         self.msg["foo"] = ["foo", "bla"]
2123         self.assertEqual(1, len(self.msg))
2124         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2125
2126     def test_set_value(self):
2127         self.msg["foo"] = [b"fool"]
2128         self.assertEqual([b"fool"], list(self.msg["foo"]))
2129         self.msg["foo"] = [b"bar"]
2130         self.assertEqual([b"bar"], list(self.msg["foo"]))
2131
2132     def test_set_value_text(self):
2133         self.msg["foo"] = ["fool"]
2134         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2135         self.msg["foo"] = ["bar"]
2136         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2137
2138     def test_keys(self):
2139         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2140         self.msg["foo"] = [b"bla"]
2141         self.msg["bar"] = [b"bla"]
2142         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2143
2144     def test_keys_text(self):
2145         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2146         self.msg["foo"] = ["bla"]
2147         self.msg["bar"] = ["bla"]
2148         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2149
2150     def test_dn(self):
2151         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2152         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2153
2154     def test_get_dn(self):
2155         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2156         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2157
2158     def test_dn_text(self):
2159         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2160         self.assertEqual("@BASEINFO", str(self.msg.dn))
2161         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2162
2163     def test_get_dn_text(self):
2164         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2165         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2166         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2167
2168     def test_get_invalid(self):
2169         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2170         self.assertRaises(TypeError, self.msg.get, 42)
2171
2172     def test_get_other(self):
2173         self.msg["foo"] = [b"bar"]
2174         self.assertEqual(b"bar", self.msg.get("foo")[0])
2175         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2176         self.assertEqual(None, self.msg.get("foo", idx=1))
2177         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2178
2179     def test_get_other_text(self):
2180         self.msg["foo"] = ["bar"]
2181         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2182         self.assertEqual("bar", self.msg.text.get("foo")[0])
2183         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2184         self.assertEqual(None, self.msg.get("foo", idx=1))
2185         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2186
2187     def test_get_default(self):
2188         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2189         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2190
2191     def test_get_default_text(self):
2192         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2193         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2194
2195     def test_get_unknown(self):
2196         self.assertEqual(None, self.msg.get("lalalala"))
2197
2198     def test_get_unknown_text(self):
2199         self.assertEqual(None, self.msg.text.get("lalalala"))
2200
2201     def test_msg_diff(self):
2202         l = ldb.Ldb()
2203         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2204         msg1 = next(msgs)[1]
2205         msg2 = next(msgs)[1]
2206         msgdiff = l.msg_diff(msg1, msg2)
2207         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2208         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2209         self.assertEqual(1, len(msgdiff))
2210
2211     def test_equal_empty(self):
2212         msg1 = ldb.Message()
2213         msg2 = ldb.Message()
2214         self.assertEqual(msg1, msg2)
2215
2216     def test_equal_simplel(self):
2217         db = ldb.Ldb()
2218         msg1 = ldb.Message()
2219         msg1.dn = ldb.Dn(db, "foo=bar")
2220         msg2 = ldb.Message()
2221         msg2.dn = ldb.Dn(db, "foo=bar")
2222         self.assertEqual(msg1, msg2)
2223         msg1['foo'] = b'bar'
2224         msg2['foo'] = b'bar'
2225         self.assertEqual(msg1, msg2)
2226         msg2['foo'] = b'blie'
2227         self.assertNotEqual(msg1, msg2)
2228         msg2['foo'] = b'blie'
2229
2230     def test_from_dict(self):
2231         rec = {"dn": "dc=fromdict",
2232                "a1": [b"a1-val1", b"a1-val1"]}
2233         l = ldb.Ldb()
2234         # check different types of input Flags
2235         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2236             m = ldb.Message.from_dict(l, rec, flags)
2237             self.assertEqual(rec["a1"], list(m["a1"]))
2238             self.assertEqual(flags, m["a1"].flags())
2239         # check input params
2240         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2241         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2242         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2243         # Message.from_dict expects dictionary with 'dn'
2244         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2245         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2246
2247     def test_from_dict_text(self):
2248         rec = {"dn": "dc=fromdict",
2249                "a1": ["a1-val1", "a1-val1"]}
2250         l = ldb.Ldb()
2251         # check different types of input Flags
2252         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2253             m = ldb.Message.from_dict(l, rec, flags)
2254             self.assertEqual(rec["a1"], list(m.text["a1"]))
2255             self.assertEqual(flags, m.text["a1"].flags())
2256         # check input params
2257         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2258         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2259         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2260         # Message.from_dict expects dictionary with 'dn'
2261         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2262         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2263
2264     def test_copy_add_message_element(self):
2265         m = ldb.Message()
2266         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2267         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2268         mto = ldb.Message()
2269         mto["1"] = m["1"]
2270         mto["2"] = m["2"]
2271         self.assertEqual(mto["1"], m["1"])
2272         self.assertEqual(mto["2"], m["2"])
2273         mto = ldb.Message()
2274         mto.add(m["1"])
2275         mto.add(m["2"])
2276         self.assertEqual(mto["1"], m["1"])
2277         self.assertEqual(mto["2"], m["2"])
2278
2279     def test_copy_add_message_element_text(self):
2280         m = ldb.Message()
2281         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2282         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2283         mto = ldb.Message()
2284         mto["1"] = m["1"]
2285         mto["2"] = m["2"]
2286         self.assertEqual(mto["1"], m.text["1"])
2287         self.assertEqual(mto["2"], m.text["2"])
2288         mto = ldb.Message()
2289         mto.add(m["1"])
2290         mto.add(m["2"])
2291         self.assertEqual(mto.text["1"], m.text["1"])
2292         self.assertEqual(mto.text["2"], m.text["2"])
2293         self.assertEqual(mto["1"], m["1"])
2294         self.assertEqual(mto["2"], m["2"])
2295
2296
2297 class MessageElementTests(TestCase):
2298
2299     def test_cmp_element(self):
2300         x = ldb.MessageElement([b"foo"])
2301         y = ldb.MessageElement([b"foo"])
2302         z = ldb.MessageElement([b"bzr"])
2303         self.assertEqual(x, y)
2304         self.assertNotEqual(x, z)
2305
2306     def test_cmp_element_text(self):
2307         x = ldb.MessageElement([b"foo"])
2308         y = ldb.MessageElement(["foo"])
2309         self.assertEqual(x, y)
2310
2311     def test_create_iterable(self):
2312         x = ldb.MessageElement([b"foo"])
2313         self.assertEqual([b"foo"], list(x))
2314         self.assertEqual(["foo"], list(x.text))
2315
2316     def test_repr(self):
2317         x = ldb.MessageElement([b"foo"])
2318         if PY3:
2319             self.assertEqual("MessageElement([b'foo'])", repr(x))
2320             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2321         else:
2322             self.assertEqual("MessageElement(['foo'])", repr(x))
2323             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2324         x = ldb.MessageElement([b"foo", b"bla"])
2325         self.assertEqual(2, len(x))
2326         if PY3:
2327             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2328             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2329         else:
2330             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2331             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2332
2333     def test_get_item(self):
2334         x = ldb.MessageElement([b"foo", b"bar"])
2335         self.assertEqual(b"foo", x[0])
2336         self.assertEqual(b"bar", x[1])
2337         self.assertEqual(b"bar", x[-1])
2338         self.assertRaises(IndexError, lambda: x[45])
2339
2340     def test_get_item_text(self):
2341         x = ldb.MessageElement(["foo", "bar"])
2342         self.assertEqual("foo", x.text[0])
2343         self.assertEqual("bar", x.text[1])
2344         self.assertEqual("bar", x.text[-1])
2345         self.assertRaises(IndexError, lambda: x[45])
2346
2347     def test_len(self):
2348         x = ldb.MessageElement([b"foo", b"bar"])
2349         self.assertEqual(2, len(x))
2350
2351     def test_eq(self):
2352         x = ldb.MessageElement([b"foo", b"bar"])
2353         y = ldb.MessageElement([b"foo", b"bar"])
2354         self.assertEqual(y, x)
2355         x = ldb.MessageElement([b"foo"])
2356         self.assertNotEqual(y, x)
2357         y = ldb.MessageElement([b"foo"])
2358         self.assertEqual(y, x)
2359
2360     def test_extended(self):
2361         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2362         if PY3:
2363             self.assertEqual("MessageElement([b'456'])", repr(el))
2364             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2365         else:
2366             self.assertEqual("MessageElement(['456'])", repr(el))
2367             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2368
2369     def test_bad_text(self):
2370         el = ldb.MessageElement(b'\xba\xdd')
2371         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2372
2373
2374 class ModuleTests(TestCase):
2375
2376     def setUp(self):
2377         super(ModuleTests, self).setUp()
2378         self.testdir = tempdir()
2379         self.filename = os.path.join(self.testdir, "test.ldb")
2380         self.ldb = ldb.Ldb(self.filename)
2381
2382     def tearDown(self):
2383         shutil.rmtree(self.testdir)
2384         super(ModuleTests, self).setUp()
2385
2386     def test_register_module(self):
2387         class ExampleModule:
2388             name = "example"
2389         ldb.register_module(ExampleModule)
2390
2391     def test_use_module(self):
2392         ops = []
2393         class ExampleModule:
2394             name = "bla"
2395
2396             def __init__(self, ldb, next):
2397                 ops.append("init")
2398                 self.next = next
2399
2400             def search(self, *args, **kwargs):
2401                 return self.next.search(*args, **kwargs)
2402
2403             def request(self, *args, **kwargs):
2404                 pass
2405
2406         ldb.register_module(ExampleModule)
2407         l = ldb.Ldb(self.filename)
2408         l.add({"dn": "@MODULES", "@LIST": "bla"})
2409         self.assertEqual([], ops)
2410         l = ldb.Ldb(self.filename)
2411         self.assertEqual(["init"], ops)
2412
2413 class LdbResultTests(LdbBaseTest):
2414
2415     def setUp(self):
2416         super(LdbResultTests, self).setUp()
2417         self.testdir = tempdir()
2418         self.filename = os.path.join(self.testdir, "test.ldb")
2419         self.l = ldb.Ldb(self.url(), flags=self.flags())
2420         try:
2421             self.l.add(self.index)
2422         except AttributeError:
2423             pass
2424         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2425                     "objectUUID": b"0123456789abcde0"})
2426         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2427                     "objectUUID": b"0123456789abcde1"})
2428         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2429                     "objectUUID": b"0123456789abcde2"})
2430         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2431                     "objectUUID": b"0123456789abcde3"})
2432         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2433                     "objectUUID": b"0123456789abcde4"})
2434         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2435                     "objectUUID": b"0123456789abcde5"})
2436         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2437                     "objectUUID": b"0123456789abcde6"})
2438         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2439                     "objectUUID": b"0123456789abcde7"})
2440         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2441                     "objectUUID": b"0123456789abcde8"})
2442         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2443                     "objectUUID": b"0123456789abcde9"})
2444         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2445                     "objectUUID": b"0123456789abcdea"})
2446         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2447                     "objectUUID": b"0123456789abcdeb"})
2448         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2449                     "objectUUID": b"0123456789abcdec"})
2450
2451     def tearDown(self):
2452         shutil.rmtree(self.testdir)
2453         super(LdbResultTests, self).tearDown()
2454         # Ensure the LDB is closed now, so we close the FD
2455         del(self.l)
2456
2457     def test_return_type(self):
2458         res = self.l.search()
2459         self.assertEqual(str(res), "<ldb result>")
2460
2461     def test_get_msgs(self):
2462         res = self.l.search()
2463         list = res.msgs
2464
2465     def test_get_controls(self):
2466         res = self.l.search()
2467         list = res.controls
2468
2469     def test_get_referals(self):
2470         res = self.l.search()
2471         list = res.referals
2472
2473     def test_iter_msgs(self):
2474         found = False
2475         for l in self.l.search().msgs:
2476             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2477                 found = True
2478         self.assertTrue(found)
2479
2480     def test_iter_msgs_count(self):
2481         self.assertTrue(self.l.search().count > 0)
2482         # 13 objects has been added to the DC=SAMBA, DC=ORG
2483         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2484
2485     def test_iter_controls(self):
2486         res = self.l.search().controls
2487         it = iter(res)
2488
2489     def test_create_control(self):
2490         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2491         c = ldb.Control(self.l, "relax:1")
2492         self.assertEqual(c.critical, True)
2493         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2494
2495     def test_iter_refs(self):
2496         res = self.l.search().referals
2497         it = iter(res)
2498
2499     def test_search_sequence_msgs(self):
2500         found = False
2501         res = self.l.search().msgs
2502
2503         for i in range(0, len(res)):
2504             l = res[i]
2505             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2506                 found = True
2507         self.assertTrue(found)
2508
2509     def test_search_as_iter(self):
2510         found = False
2511         res = self.l.search()
2512
2513         for l in res:
2514             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2515                 found = True
2516         self.assertTrue(found)
2517
2518     def test_search_iter(self):
2519         found = False
2520         res = self.l.search_iterator()
2521
2522         for l in res:
2523             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2524                 found = True
2525         self.assertTrue(found)
2526
2527
2528     # Show that search results can't see into a transaction
2529     def test_search_against_trans(self):
2530         found11 = False
2531
2532         (r1, w1) = os.pipe()
2533
2534         (r2, w2) = os.pipe()
2535
2536         # For the first element, fork a child that will
2537         # write to the DB
2538         pid = os.fork()
2539         if pid == 0:
2540             # In the child, re-open
2541             del(self.l)
2542             gc.collect()
2543
2544             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2545             # start a transaction
2546             child_ldb.transaction_start()
2547
2548             # write to it
2549             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2550                            "name": b"samba.org",
2551                            "objectUUID": b"o123456789acbdef"})
2552
2553             os.write(w1, b"added")
2554
2555             # Now wait for the search to be done
2556             os.read(r2, 6)
2557
2558             # and commit
2559             try:
2560                 child_ldb.transaction_commit()
2561             except LdbError as err:
2562                 # We print this here to see what went wrong in the child
2563                 print(err)
2564                 os._exit(1)
2565
2566             os.write(w1, b"transaction")
2567             os._exit(0)
2568
2569         self.assertEqual(os.read(r1, 5), b"added")
2570
2571         # This should not turn up until the transaction is concluded
2572         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2573                             scope=ldb.SCOPE_BASE)
2574         self.assertEqual(len(res11), 0)
2575
2576         os.write(w2, b"search")
2577
2578         # Now wait for the transaction to be done.  This should
2579         # deadlock, but the search doesn't hold a read lock for the
2580         # iterator lifetime currently.
2581         self.assertEqual(os.read(r1, 11), b"transaction")
2582
2583         # This should now turn up, as the transaction is over
2584         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2585                             scope=ldb.SCOPE_BASE)
2586         self.assertEqual(len(res11), 1)
2587
2588         self.assertFalse(found11)
2589
2590         (got_pid, status) = os.waitpid(pid, 0)
2591         self.assertEqual(got_pid, pid)
2592
2593
2594     def test_search_iter_against_trans(self):
2595         found = False
2596         found11 = False
2597
2598         # We need to hold this iterator open to hold the all-record
2599         # lock
2600         res = self.l.search_iterator()
2601
2602         (r1, w1) = os.pipe()
2603
2604         (r2, w2) = os.pipe()
2605
2606         # For the first element, with the sequence open (which
2607         # means with ldb locks held), fork a child that will
2608         # write to the DB
2609         pid = os.fork()
2610         if pid == 0:
2611             # In the child, re-open
2612             del(res)
2613             del(self.l)
2614             gc.collect()
2615
2616             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2617             # start a transaction
2618             child_ldb.transaction_start()
2619
2620             # write to it
2621             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2622                            "name": b"samba.org",
2623                            "objectUUID": b"o123456789acbdef"})
2624
2625             os.write(w1, b"added")
2626
2627             # Now wait for the search to be done
2628             os.read(r2, 6)
2629
2630             # and commit
2631             try:
2632                 child_ldb.transaction_commit()
2633             except LdbError as err:
2634                 # We print this here to see what went wrong in the child
2635                 print(err)
2636                 os._exit(1)
2637
2638             os.write(w1, b"transaction")
2639             os._exit(0)
2640
2641         self.assertEqual(os.read(r1, 5), b"added")
2642
2643         # This should not turn up until the transaction is concluded
2644         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2645                             scope=ldb.SCOPE_BASE)
2646         self.assertEqual(len(res11), 0)
2647
2648         os.write(w2, b"search")
2649
2650         # allow the transaction to start
2651         time.sleep(1)
2652
2653         # This should not turn up until the search finishes and
2654         # removed the read lock, but for ldb_tdb that happened as soon
2655         # as we called the first res.next()
2656         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2657                             scope=ldb.SCOPE_BASE)
2658         self.assertEqual(len(res11), 0)
2659
2660         # These results are all collected at the first next(res) call
2661         for l in res:
2662             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2663                 found = True
2664             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2665                 found11 = True
2666
2667         # Now wait for the transaction to be done.
2668         self.assertEqual(os.read(r1, 11), b"transaction")
2669
2670         # This should now turn up, as the transaction is over and all
2671         # read locks are gone
2672         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2673                             scope=ldb.SCOPE_BASE)
2674         self.assertEqual(len(res11), 1)
2675
2676         self.assertTrue(found)
2677         self.assertFalse(found11)
2678
2679         (got_pid, status) = os.waitpid(pid, 0)
2680         self.assertEqual(got_pid, pid)
2681
2682
2683 class LdbResultTestsLmdb(LdbResultTests):
2684
2685     def setUp(self):
2686         self.prefix = MDB_PREFIX
2687         self.index = MDB_INDEX_OBJ
2688         super(LdbResultTestsLmdb, self).setUp()
2689
2690     def tearDown(self):
2691         super(LdbResultTestsLmdb, self).tearDown()
2692
2693
2694 class BadTypeTests(TestCase):
2695     def test_control(self):
2696         l = ldb.Ldb()
2697         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2698         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2699
2700     def test_modify(self):
2701         l = ldb.Ldb()
2702         dn = ldb.Dn(l, 'a=b')
2703         m = ldb.Message(dn)
2704         self.assertRaises(TypeError, l.modify, '<bad type>')
2705         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2706
2707     def test_add(self):
2708         l = ldb.Ldb()
2709         dn = ldb.Dn(l, 'a=b')
2710         m = ldb.Message(dn)
2711         self.assertRaises(TypeError, l.add, '<bad type>')
2712         self.assertRaises(TypeError, l.add, m, '<bad type>')
2713
2714     def test_delete(self):
2715         l = ldb.Ldb()
2716         dn = ldb.Dn(l, 'a=b')
2717         self.assertRaises(TypeError, l.add, '<bad type>')
2718         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2719
2720     def test_rename(self):
2721         l = ldb.Ldb()
2722         dn = ldb.Dn(l, 'a=b')
2723         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2724         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2725         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2726
2727     def test_search(self):
2728         l = ldb.Ldb()
2729         self.assertRaises(TypeError, l.search, base=1234)
2730         self.assertRaises(TypeError, l.search, scope='<bad type>')
2731         self.assertRaises(TypeError, l.search, expression=1234)
2732         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2733         self.assertRaises(TypeError, l.search, controls='<bad type>')
2734
2735
2736 class VersionTests(TestCase):
2737
2738     def test_version(self):
2739         self.assertTrue(isinstance(ldb.__version__, str))
2740
2741
2742 if __name__ == '__main__':
2743     import unittest
2744     unittest.TestProgram()