c88ddc7826ef56503b7c8126ed583c3b87dc6119
[nivanova/samba-autobuild/.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18 MDB_INDEX_OBJ = {
19     "dn": "@INDEXLIST",
20     "@IDXONE": [b"1"],
21     "@IDXGUID": [b"objectUUID"],
22     "@IDX_DN_GUID": [b"GUID"]
23 }
24
25 def tempdir():
26     import tempfile
27     try:
28         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
29     except KeyError:
30         dir_prefix = None
31     return tempfile.mkdtemp(dir=dir_prefix)
32
33
34 class NoContextTests(TestCase):
35
36     def test_valid_attr_name(self):
37         self.assertTrue(ldb.valid_attr_name("foo"))
38         self.assertFalse(ldb.valid_attr_name("24foo"))
39
40     def test_timestring(self):
41         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
42         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
43
44     def test_string_to_time(self):
45         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
46         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
47
48     def test_binary_encode(self):
49         encoded = ldb.binary_encode(b'test\\x')
50         decoded = ldb.binary_decode(encoded)
51         self.assertEqual(decoded, b'test\\x')
52
53         encoded2 = ldb.binary_encode('test\\x')
54         self.assertEqual(encoded2, encoded)
55
56
57 class LdbBaseTest(TestCase):
58     def setUp(self):
59         super(LdbBaseTest, self).setUp()
60         try:
61             if self.prefix is None:
62                 self.prefix = TDB_PREFIX
63         except AttributeError:
64             self.prefix = TDB_PREFIX
65
66     def tearDown(self):
67         super(LdbBaseTest, self).tearDown()
68
69     def url(self):
70         return self.prefix + self.filename
71
72     def flags(self):
73         if self.prefix == MDB_PREFIX:
74             return ldb.FLG_NOSYNC
75         else:
76             return 0
77
78
79 class SimpleLdb(LdbBaseTest):
80
81     def setUp(self):
82         super(SimpleLdb, self).setUp()
83         self.testdir = tempdir()
84         self.filename = os.path.join(self.testdir, "test.ldb")
85         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
86         try:
87             self.ldb.add(self.index)
88         except AttributeError:
89             pass
90
91     def tearDown(self):
92         shutil.rmtree(self.testdir)
93         super(SimpleLdb, self).tearDown()
94         # Ensure the LDB is closed now, so we close the FD
95         del(self.ldb)
96
97     def test_connect(self):
98         ldb.Ldb(self.url(), flags=self.flags())
99
100     def test_connect_none(self):
101         ldb.Ldb()
102
103     def test_connect_later(self):
104         x = ldb.Ldb()
105         x.connect(self.url(), flags=self.flags())
106
107     def test_repr(self):
108         x = ldb.Ldb()
109         self.assertTrue(repr(x).startswith("<ldb connection"))
110
111     def test_set_create_perms(self):
112         x = ldb.Ldb()
113         x.set_create_perms(0o600)
114
115     def test_modules_none(self):
116         x = ldb.Ldb()
117         self.assertEqual([], x.modules())
118
119     def test_modules_tdb(self):
120         x = ldb.Ldb(self.url(), flags=self.flags())
121         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
122
123     def test_firstmodule_none(self):
124         x = ldb.Ldb()
125         self.assertEqual(x.firstmodule, None)
126
127     def test_firstmodule_tdb(self):
128         x = ldb.Ldb(self.url(), flags=self.flags())
129         mod = x.firstmodule
130         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
131
132     def test_search(self):
133         l = ldb.Ldb(self.url(), flags=self.flags())
134         self.assertEqual(len(l.search()), 0)
135
136     def test_search_controls(self):
137         l = ldb.Ldb(self.url(), flags=self.flags())
138         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
139
140     def test_search_attrs(self):
141         l = ldb.Ldb(self.url(), flags=self.flags())
142         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
143
144     def test_search_string_dn(self):
145         l = ldb.Ldb(self.url(), flags=self.flags())
146         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
147
148     def test_search_attr_string(self):
149         l = ldb.Ldb(self.url(), flags=self.flags())
150         self.assertRaises(TypeError, l.search, attrs="dc")
151         self.assertRaises(TypeError, l.search, attrs=b"dc")
152
153     def test_opaque(self):
154         l = ldb.Ldb(self.url(), flags=self.flags())
155         l.set_opaque("my_opaque", l)
156         self.assertTrue(l.get_opaque("my_opaque") is not None)
157         self.assertEqual(None, l.get_opaque("unknown"))
158
159     def test_search_scope_base_empty_db(self):
160         l = ldb.Ldb(self.url(), flags=self.flags())
161         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
162                                       ldb.SCOPE_BASE)), 0)
163
164     def test_search_scope_onelevel_empty_db(self):
165         l = ldb.Ldb(self.url(), flags=self.flags())
166         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
167                                       ldb.SCOPE_ONELEVEL)), 0)
168
169     def test_delete(self):
170         l = ldb.Ldb(self.url(), flags=self.flags())
171         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
172
173     def test_delete_w_unhandled_ctrl(self):
174         l = ldb.Ldb(self.url(), flags=self.flags())
175         m = ldb.Message()
176         m.dn = ldb.Dn(l, "dc=foo1")
177         m["b"] = [b"a"]
178         m["objectUUID"] = b"0123456789abcdef"
179         l.add(m)
180         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
181         l.delete(m.dn)
182
183     def test_contains(self):
184         name = self.url()
185         l = ldb.Ldb(name, flags=self.flags())
186         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
187         l = ldb.Ldb(name, flags=self.flags())
188         m = ldb.Message()
189         m.dn = ldb.Dn(l, "dc=foo3")
190         m["b"] = ["a"]
191         m["objectUUID"] = b"0123456789abcdef"
192         l.add(m)
193         try:
194             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
195             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
196         finally:
197             l.delete(m.dn)
198
199     def test_get_config_basedn(self):
200         l = ldb.Ldb(self.url(), flags=self.flags())
201         self.assertEqual(None, l.get_config_basedn())
202
203     def test_get_root_basedn(self):
204         l = ldb.Ldb(self.url(), flags=self.flags())
205         self.assertEqual(None, l.get_root_basedn())
206
207     def test_get_schema_basedn(self):
208         l = ldb.Ldb(self.url(), flags=self.flags())
209         self.assertEqual(None, l.get_schema_basedn())
210
211     def test_get_default_basedn(self):
212         l = ldb.Ldb(self.url(), flags=self.flags())
213         self.assertEqual(None, l.get_default_basedn())
214
215     def test_add(self):
216         l = ldb.Ldb(self.url(), flags=self.flags())
217         m = ldb.Message()
218         m.dn = ldb.Dn(l, "dc=foo4")
219         m["bla"] = b"bla"
220         m["objectUUID"] = b"0123456789abcdef"
221         self.assertEqual(len(l.search()), 0)
222         l.add(m)
223         try:
224             self.assertEqual(len(l.search()), 1)
225         finally:
226             l.delete(ldb.Dn(l, "dc=foo4"))
227
228     def test_search_iterator(self):
229         l = ldb.Ldb(self.url(), flags=self.flags())
230         s = l.search_iterator()
231         s.abandon()
232         try:
233             for me in s:
234                 self.fail()
235             self.fail()
236         except RuntimeError as re:
237             pass
238         try:
239             s.abandon()
240             self.fail()
241         except RuntimeError as re:
242             pass
243         try:
244             s.result()
245             self.fail()
246         except RuntimeError as re:
247             pass
248
249         s = l.search_iterator()
250         count = 0
251         for me in s:
252             self.assertTrue(isinstance(me, ldb.Message))
253             count += 1
254         r = s.result()
255         self.assertEqual(len(r), 0)
256         self.assertEqual(count, 0)
257
258         m1 = ldb.Message()
259         m1.dn = ldb.Dn(l, "dc=foo4")
260         m1["bla"] = b"bla"
261         m1["objectUUID"] = b"0123456789abcdef"
262         l.add(m1)
263         try:
264             s = l.search_iterator()
265             msgs = []
266             for me in s:
267                 self.assertTrue(isinstance(me, ldb.Message))
268                 count += 1
269                 msgs.append(me)
270             r = s.result()
271             self.assertEqual(len(r), 0)
272             self.assertEqual(len(msgs), 1)
273             self.assertEqual(msgs[0].dn, m1.dn)
274
275             m2 = ldb.Message()
276             m2.dn = ldb.Dn(l, "dc=foo5")
277             m2["bla"] = b"bla"
278             m2["objectUUID"] = b"0123456789abcdee"
279             l.add(m2)
280
281             s = l.search_iterator()
282             msgs = []
283             for me in s:
284                 self.assertTrue(isinstance(me, ldb.Message))
285                 count += 1
286                 msgs.append(me)
287             r = s.result()
288             self.assertEqual(len(r), 0)
289             self.assertEqual(len(msgs), 2)
290             if msgs[0].dn == m1.dn:
291                 self.assertEqual(msgs[0].dn, m1.dn)
292                 self.assertEqual(msgs[1].dn, m2.dn)
293             else:
294                 self.assertEqual(msgs[0].dn, m2.dn)
295                 self.assertEqual(msgs[1].dn, m1.dn)
296
297             s = l.search_iterator()
298             msgs = []
299             for me in s:
300                 self.assertTrue(isinstance(me, ldb.Message))
301                 count += 1
302                 msgs.append(me)
303                 break
304             try:
305                 s.result()
306                 self.fail()
307             except RuntimeError as re:
308                 pass
309             for me in s:
310                 self.assertTrue(isinstance(me, ldb.Message))
311                 count += 1
312                 msgs.append(me)
313                 break
314             for me in s:
315                 self.fail()
316
317             r = s.result()
318             self.assertEqual(len(r), 0)
319             self.assertEqual(len(msgs), 2)
320             if msgs[0].dn == m1.dn:
321                 self.assertEqual(msgs[0].dn, m1.dn)
322                 self.assertEqual(msgs[1].dn, m2.dn)
323             else:
324                 self.assertEqual(msgs[0].dn, m2.dn)
325                 self.assertEqual(msgs[1].dn, m1.dn)
326         finally:
327             l.delete(ldb.Dn(l, "dc=foo4"))
328             l.delete(ldb.Dn(l, "dc=foo5"))
329
330     def test_add_text(self):
331         l = ldb.Ldb(self.url(), flags=self.flags())
332         m = ldb.Message()
333         m.dn = ldb.Dn(l, "dc=foo4")
334         m["bla"] = "bla"
335         m["objectUUID"] = b"0123456789abcdef"
336         self.assertEqual(len(l.search()), 0)
337         l.add(m)
338         try:
339             self.assertEqual(len(l.search()), 1)
340         finally:
341             l.delete(ldb.Dn(l, "dc=foo4"))
342
343     def test_add_w_unhandled_ctrl(self):
344         l = ldb.Ldb(self.url(), flags=self.flags())
345         m = ldb.Message()
346         m.dn = ldb.Dn(l, "dc=foo4")
347         m["bla"] = b"bla"
348         self.assertEqual(len(l.search()), 0)
349         self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
350
351     def test_add_dict(self):
352         l = ldb.Ldb(self.url(), flags=self.flags())
353         m = {"dn": ldb.Dn(l, "dc=foo5"),
354              "bla": b"bla",
355              "objectUUID": b"0123456789abcdef"}
356         self.assertEqual(len(l.search()), 0)
357         l.add(m)
358         try:
359             self.assertEqual(len(l.search()), 1)
360         finally:
361             l.delete(ldb.Dn(l, "dc=foo5"))
362
363     def test_add_dict_text(self):
364         l = ldb.Ldb(self.url(), flags=self.flags())
365         m = {"dn": ldb.Dn(l, "dc=foo5"),
366              "bla": "bla",
367              "objectUUID": b"0123456789abcdef"}
368         self.assertEqual(len(l.search()), 0)
369         l.add(m)
370         try:
371             self.assertEqual(len(l.search()), 1)
372         finally:
373             l.delete(ldb.Dn(l, "dc=foo5"))
374
375     def test_add_dict_string_dn(self):
376         l = ldb.Ldb(self.url(), flags=self.flags())
377         m = {"dn": "dc=foo6", "bla": b"bla",
378              "objectUUID": b"0123456789abcdef"}
379         self.assertEqual(len(l.search()), 0)
380         l.add(m)
381         try:
382             self.assertEqual(len(l.search()), 1)
383         finally:
384             l.delete(ldb.Dn(l, "dc=foo6"))
385
386     def test_add_dict_bytes_dn(self):
387         l = ldb.Ldb(self.url(), flags=self.flags())
388         m = {"dn": b"dc=foo6", "bla": b"bla",
389              "objectUUID": b"0123456789abcdef"}
390         self.assertEqual(len(l.search()), 0)
391         l.add(m)
392         try:
393             self.assertEqual(len(l.search()), 1)
394         finally:
395             l.delete(ldb.Dn(l, "dc=foo6"))
396
397     def test_rename(self):
398         l = ldb.Ldb(self.url(), flags=self.flags())
399         m = ldb.Message()
400         m.dn = ldb.Dn(l, "dc=foo7")
401         m["bla"] = b"bla"
402         m["objectUUID"] = b"0123456789abcdef"
403         self.assertEqual(len(l.search()), 0)
404         l.add(m)
405         try:
406             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
407             self.assertEqual(len(l.search()), 1)
408         finally:
409             l.delete(ldb.Dn(l, "dc=bar"))
410
411     def test_rename_string_dns(self):
412         l = ldb.Ldb(self.url(), flags=self.flags())
413         m = ldb.Message()
414         m.dn = ldb.Dn(l, "dc=foo8")
415         m["bla"] = b"bla"
416         m["objectUUID"] = b"0123456789abcdef"
417         self.assertEqual(len(l.search()), 0)
418         l.add(m)
419         self.assertEqual(len(l.search()), 1)
420         try:
421             l.rename("dc=foo8", "dc=bar")
422             self.assertEqual(len(l.search()), 1)
423         finally:
424             l.delete(ldb.Dn(l, "dc=bar"))
425
426     def test_rename_bad_string_dns(self):
427         l = ldb.Ldb(self.url(), flags=self.flags())
428         m = ldb.Message()
429         m.dn = ldb.Dn(l, "dc=foo8")
430         m["bla"] = b"bla"
431         m["objectUUID"] = b"0123456789abcdef"
432         self.assertEqual(len(l.search()), 0)
433         l.add(m)
434         self.assertEqual(len(l.search()), 1)
435         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
436         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
437         l.delete(ldb.Dn(l, "dc=foo8"))
438
439     def test_empty_dn(self):
440         l = ldb.Ldb(self.url(), flags=self.flags())
441         self.assertEqual(0, len(l.search()))
442         m = ldb.Message()
443         m.dn = ldb.Dn(l, "dc=empty")
444         m["objectUUID"] = b"0123456789abcdef"
445         l.add(m)
446         rm = l.search()
447         self.assertEqual(1, len(rm))
448         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
449                          set(rm[0].keys()))
450
451         rm = l.search(m.dn)
452         self.assertEqual(1, len(rm))
453         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
454                          set(rm[0].keys()))
455         rm = l.search(m.dn, attrs=["blah"])
456         self.assertEqual(1, len(rm))
457         self.assertEqual(0, len(rm[0]))
458
459     def test_modify_delete(self):
460         l = ldb.Ldb(self.url(), flags=self.flags())
461         m = ldb.Message()
462         m.dn = ldb.Dn(l, "dc=modifydelete")
463         m["bla"] = [b"1234"]
464         m["objectUUID"] = b"0123456789abcdef"
465         l.add(m)
466         rm = l.search(m.dn)[0]
467         self.assertEqual([b"1234"], list(rm["bla"]))
468         try:
469             m = ldb.Message()
470             m.dn = ldb.Dn(l, "dc=modifydelete")
471             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
472             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
473             l.modify(m)
474             rm = l.search(m.dn)
475             self.assertEqual(1, len(rm))
476             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
477                              set(rm[0].keys()))
478             rm = l.search(m.dn, attrs=["bla"])
479             self.assertEqual(1, len(rm))
480             self.assertEqual(0, len(rm[0]))
481         finally:
482             l.delete(ldb.Dn(l, "dc=modifydelete"))
483
484     def test_modify_delete_text(self):
485         l = ldb.Ldb(self.url(), flags=self.flags())
486         m = ldb.Message()
487         m.dn = ldb.Dn(l, "dc=modifydelete")
488         m.text["bla"] = ["1234"]
489         m["objectUUID"] = b"0123456789abcdef"
490         l.add(m)
491         rm = l.search(m.dn)[0]
492         self.assertEqual(["1234"], list(rm.text["bla"]))
493         try:
494             m = ldb.Message()
495             m.dn = ldb.Dn(l, "dc=modifydelete")
496             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
497             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
498             l.modify(m)
499             rm = l.search(m.dn)
500             self.assertEqual(1, len(rm))
501             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
502                              set(rm[0].keys()))
503             rm = l.search(m.dn, attrs=["bla"])
504             self.assertEqual(1, len(rm))
505             self.assertEqual(0, len(rm[0]))
506         finally:
507             l.delete(ldb.Dn(l, "dc=modifydelete"))
508
509     def test_modify_add(self):
510         l = ldb.Ldb(self.url(), flags=self.flags())
511         m = ldb.Message()
512         m.dn = ldb.Dn(l, "dc=add")
513         m["bla"] = [b"1234"]
514         m["objectUUID"] = b"0123456789abcdef"
515         l.add(m)
516         try:
517             m = ldb.Message()
518             m.dn = ldb.Dn(l, "dc=add")
519             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
520             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
521             l.modify(m)
522             rm = l.search(m.dn)[0]
523             self.assertEqual(3, len(rm))
524             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
525         finally:
526             l.delete(ldb.Dn(l, "dc=add"))
527
528     def test_modify_add_text(self):
529         l = ldb.Ldb(self.url(), flags=self.flags())
530         m = ldb.Message()
531         m.dn = ldb.Dn(l, "dc=add")
532         m.text["bla"] = ["1234"]
533         m["objectUUID"] = b"0123456789abcdef"
534         l.add(m)
535         try:
536             m = ldb.Message()
537             m.dn = ldb.Dn(l, "dc=add")
538             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
539             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
540             l.modify(m)
541             rm = l.search(m.dn)[0]
542             self.assertEqual(3, len(rm))
543             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
544         finally:
545             l.delete(ldb.Dn(l, "dc=add"))
546
547     def test_modify_replace(self):
548         l = ldb.Ldb(self.url(), flags=self.flags())
549         m = ldb.Message()
550         m.dn = ldb.Dn(l, "dc=modify2")
551         m["bla"] = [b"1234", b"456"]
552         m["objectUUID"] = b"0123456789abcdef"
553         l.add(m)
554         try:
555             m = ldb.Message()
556             m.dn = ldb.Dn(l, "dc=modify2")
557             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
558             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
559             l.modify(m)
560             rm = l.search(m.dn)[0]
561             self.assertEqual(3, len(rm))
562             self.assertEqual([b"789"], list(rm["bla"]))
563             rm = l.search(m.dn, attrs=["bla"])[0]
564             self.assertEqual(1, len(rm))
565         finally:
566             l.delete(ldb.Dn(l, "dc=modify2"))
567
568     def test_modify_replace_text(self):
569         l = ldb.Ldb(self.url(), flags=self.flags())
570         m = ldb.Message()
571         m.dn = ldb.Dn(l, "dc=modify2")
572         m.text["bla"] = ["1234", "456"]
573         m["objectUUID"] = b"0123456789abcdef"
574         l.add(m)
575         try:
576             m = ldb.Message()
577             m.dn = ldb.Dn(l, "dc=modify2")
578             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
579             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
580             l.modify(m)
581             rm = l.search(m.dn)[0]
582             self.assertEqual(3, len(rm))
583             self.assertEqual(["789"], list(rm.text["bla"]))
584             rm = l.search(m.dn, attrs=["bla"])[0]
585             self.assertEqual(1, len(rm))
586         finally:
587             l.delete(ldb.Dn(l, "dc=modify2"))
588
589     def test_modify_flags_change(self):
590         l = ldb.Ldb(self.url(), flags=self.flags())
591         m = ldb.Message()
592         m.dn = ldb.Dn(l, "dc=add")
593         m["bla"] = [b"1234"]
594         m["objectUUID"] = b"0123456789abcdef"
595         l.add(m)
596         try:
597             m = ldb.Message()
598             m.dn = ldb.Dn(l, "dc=add")
599             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
600             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
601             l.modify(m)
602             rm = l.search(m.dn)[0]
603             self.assertEqual(3, len(rm))
604             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
605
606             # Now create another modify, but switch the flags before we do it
607             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
608             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
609             l.modify(m)
610             rm = l.search(m.dn, attrs=["bla"])[0]
611             self.assertEqual(1, len(rm))
612             self.assertEqual([b"1234"], list(rm["bla"]))
613         finally:
614             l.delete(ldb.Dn(l, "dc=add"))
615
616     def test_modify_flags_change_text(self):
617         l = ldb.Ldb(self.url(), flags=self.flags())
618         m = ldb.Message()
619         m.dn = ldb.Dn(l, "dc=add")
620         m.text["bla"] = ["1234"]
621         m["objectUUID"] = b"0123456789abcdef"
622         l.add(m)
623         try:
624             m = ldb.Message()
625             m.dn = ldb.Dn(l, "dc=add")
626             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
627             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
628             l.modify(m)
629             rm = l.search(m.dn)[0]
630             self.assertEqual(3, len(rm))
631             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
632
633             # Now create another modify, but switch the flags before we do it
634             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
635             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
636             l.modify(m)
637             rm = l.search(m.dn, attrs=["bla"])[0]
638             self.assertEqual(1, len(rm))
639             self.assertEqual(["1234"], list(rm.text["bla"]))
640         finally:
641             l.delete(ldb.Dn(l, "dc=add"))
642
643     def test_transaction_commit(self):
644         l = ldb.Ldb(self.url(), flags=self.flags())
645         l.transaction_start()
646         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
647         m["foo"] = [b"bar"]
648         m["objectUUID"] = b"0123456789abcdef"
649         l.add(m)
650         l.transaction_commit()
651         l.delete(m.dn)
652
653     def test_transaction_cancel(self):
654         l = ldb.Ldb(self.url(), flags=self.flags())
655         l.transaction_start()
656         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
657         m["foo"] = [b"bar"]
658         m["objectUUID"] = b"0123456789abcdee"
659         l.add(m)
660         l.transaction_cancel()
661         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
662
663     def test_set_debug(self):
664         def my_report_fn(level, text):
665             pass
666         l = ldb.Ldb(self.url(), flags=self.flags())
667         l.set_debug(my_report_fn)
668
669     def test_zero_byte_string(self):
670         """Testing we do not get trapped in the \0 byte in a property string."""
671         l = ldb.Ldb(self.url(), flags=self.flags())
672         l.add({
673             "dn": b"dc=somedn",
674             "objectclass": b"user",
675             "cN": b"LDAPtestUSER",
676             "givenname": b"ldap",
677             "displayname": b"foo\0bar",
678             "objectUUID": b"0123456789abcdef"
679         })
680         res = l.search(expression="(dn=dc=somedn)")
681         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
682
683     def test_no_crash_broken_expr(self):
684         l = ldb.Ldb(self.url(), flags=self.flags())
685         self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
686
687 # Run the SimpleLdb tests against an lmdb backend
688 class SimpleLdbLmdb(SimpleLdb):
689
690     def setUp(self):
691         self.prefix = MDB_PREFIX
692         self.index = MDB_INDEX_OBJ
693         super(SimpleLdbLmdb, self).setUp()
694
695     def tearDown(self):
696         super(SimpleLdbLmdb, self).tearDown()
697
698 class SearchTests(LdbBaseTest):
699     def tearDown(self):
700         shutil.rmtree(self.testdir)
701         super(SearchTests, self).tearDown()
702
703         # Ensure the LDB is closed now, so we close the FD
704         del(self.l)
705
706
707     def setUp(self):
708         super(SearchTests, self).setUp()
709         self.testdir = tempdir()
710         self.filename = os.path.join(self.testdir, "search_test.ldb")
711         options = ["modules:rdn_name"]
712         if hasattr(self, 'IDXCHECK'):
713             options.append("disable_full_db_scan_for_self_test:1")
714         self.l = ldb.Ldb(self.url(),
715                          flags=self.flags(),
716                          options=options)
717         try:
718             self.l.add(self.index)
719         except AttributeError:
720             pass
721
722         self.l.add({"dn": "@ATTRIBUTES",
723                     "DC": "CASE_INSENSITIVE"})
724
725         # Note that we can't use the name objectGUID here, as we
726         # want to stay clear of the objectGUID handler in LDB and
727         # instead use just the 16 bytes raw, which we just keep
728         # to printable chars here for ease of handling.
729
730         self.l.add({"dn": "DC=SAMBA,DC=ORG",
731                     "name": b"samba.org",
732                     "objectUUID": b"0123456789abcdef"})
733         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
734                     "name": b"Admins",
735                     "x": "z", "y": "a",
736                     "objectUUID": b"0123456789abcde1"})
737         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
738                     "name": b"Users",
739                     "x": "z", "y": "a",
740                     "objectUUID": b"0123456789abcde2"})
741         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
742                     "name": b"OU #1",
743                     "x": "y", "y": "a",
744                     "objectUUID": b"0123456789abcde3"})
745         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
746                     "name": b"OU #2",
747                     "x": "y", "y": "a",
748                     "objectUUID": b"0123456789abcde4"})
749         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
750                     "name": b"OU #3",
751                     "x": "y", "y": "a",
752                     "objectUUID": b"0123456789abcde5"})
753         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
754                     "name": b"OU #4",
755                     "x": "y", "y": "a",
756                     "objectUUID": b"0123456789abcde6"})
757         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
758                     "name": b"OU #5",
759                     "x": "y", "y": "a",
760                     "objectUUID": b"0123456789abcde7"})
761         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
762                     "name": b"OU #6",
763                     "x": "y", "y": "a",
764                     "objectUUID": b"0123456789abcde8"})
765         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
766                     "name": b"OU #7",
767                     "x": "y", "y": "a",
768                     "objectUUID": b"0123456789abcde9"})
769         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
770                     "name": b"OU #8",
771                     "x": "y", "y": "a",
772                     "objectUUID": b"0123456789abcde0"})
773         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
774                     "name": b"OU #9",
775                     "x": "y", "y": "a",
776                     "objectUUID": b"0123456789abcdea"})
777         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
778                     "name": b"OU #10",
779                     "x": "y", "y": "a",
780                     "objectUUID": b"0123456789abcdeb"})
781         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
782                     "name": b"OU #10",
783                     "x": "y", "y": "a",
784                     "objectUUID": b"0123456789abcdec"})
785         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
786                     "name": b"OU #10",
787                     "x": "y", "y": "b",
788                     "objectUUID": b"0123456789abcded"})
789         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
790                     "name": b"OU #10",
791                     "x": "x", "y": "b",
792                     "objectUUID": b"0123456789abcdee"})
793         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
794                     "name": b"OU #10",
795                     "x": "x", "y": "b",
796                     "objectUUID": b"0123456789abcd01"})
797         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
798                     "name": b"OU #10",
799                     "x": "x", "y": "b",
800                     "objectUUID": b"0123456789abcd02"})
801         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
802                     "name": b"OU #10",
803                     "x": "x", "y": "b",
804                     "objectUUID": b"0123456789abcd03"})
805         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
806                     "name": b"OU #10",
807                     "x": "x", "y": "b",
808                     "objectUUID": b"0123456789abcd04"})
809         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
810                     "name": b"OU #10",
811                     "x": "x", "y": "b",
812                     "objectUUID": b"0123456789abcd05"})
813         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
814                     "name": b"OU #10",
815                     "x": "x", "y": "b",
816                     "objectUUID": b"0123456789abcd06"})
817         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
818                     "name": b"OU #10",
819                     "x": "x", "y": "b",
820                     "objectUUID": b"0123456789abcd07"})
821         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
822                     "name": b"OU #10",
823                     "x": "x", "y": "c",
824                     "objectUUID": b"0123456789abcd08"})
825         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
826                     "name": b"OU #10",
827                     "x": "x", "y": "c",
828                     "objectUUID": b"0123456789abcd09"})
829
830     def test_base(self):
831         """Testing a search"""
832
833         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
834                               scope=ldb.SCOPE_BASE)
835         self.assertEqual(len(res11), 1)
836
837     def test_base_lower(self):
838         """Testing a search"""
839
840         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
841                               scope=ldb.SCOPE_BASE)
842         self.assertEqual(len(res11), 1)
843
844     def test_base_or(self):
845         """Testing a search"""
846
847         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
848                               scope=ldb.SCOPE_BASE,
849                               expression="(|(ou=ou11)(ou=ou12))")
850         self.assertEqual(len(res11), 1)
851
852     def test_base_or2(self):
853         """Testing a search"""
854
855         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
856                               scope=ldb.SCOPE_BASE,
857                               expression="(|(x=y)(y=b))")
858         self.assertEqual(len(res11), 1)
859
860     def test_base_and(self):
861         """Testing a search"""
862
863         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
864                               scope=ldb.SCOPE_BASE,
865                               expression="(&(ou=ou11)(ou=ou12))")
866         self.assertEqual(len(res11), 0)
867
868     def test_base_and2(self):
869         """Testing a search"""
870
871         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
872                               scope=ldb.SCOPE_BASE,
873                               expression="(&(x=y)(y=a))")
874         self.assertEqual(len(res11), 1)
875
876     def test_base_false(self):
877         """Testing a search"""
878
879         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
880                               scope=ldb.SCOPE_BASE,
881                               expression="(|(ou=ou13)(ou=ou12))")
882         self.assertEqual(len(res11), 0)
883
884     def test_check_base_false(self):
885         """Testing a search"""
886         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
887                               scope=ldb.SCOPE_BASE,
888                               expression="(|(ou=ou13)(ou=ou12))")
889         self.assertEqual(len(res11), 0)
890
891     def test_check_base_error(self):
892         """Testing a search"""
893         checkbaseonsearch = {"dn": "@OPTIONS",
894                              "checkBaseOnSearch": b"TRUE"}
895         try:
896             self.l.add(checkbaseonsearch)
897         except ldb.LdbError as err:
898             enum = err.args[0]
899             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
900             m = ldb.Message.from_dict(self.l,
901                                       checkbaseonsearch)
902             self.l.modify(m)
903
904         try:
905             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
906                                   scope=ldb.SCOPE_BASE,
907                                   expression="(|(ou=ou13)(ou=ou12))")
908             self.fail("Should have failed on missing base")
909         except ldb.LdbError as err:
910             enum = err.args[0]
911             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
912
913     def test_subtree_and(self):
914         """Testing a search"""
915
916         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
917                               scope=ldb.SCOPE_SUBTREE,
918                               expression="(&(ou=ou11)(ou=ou12))")
919         self.assertEqual(len(res11), 0)
920
921     def test_subtree_and2(self):
922         """Testing a search"""
923
924         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
925                               scope=ldb.SCOPE_SUBTREE,
926                               expression="(&(x=y)(|(y=b)(y=c)))")
927         self.assertEqual(len(res11), 1)
928
929     def test_subtree_and2_lower(self):
930         """Testing a search"""
931
932         res11 = self.l.search(base="DC=samba,DC=org",
933                               scope=ldb.SCOPE_SUBTREE,
934                               expression="(&(x=y)(|(y=b)(y=c)))")
935         self.assertEqual(len(res11), 1)
936
937     def test_subtree_or(self):
938         """Testing a search"""
939
940         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
941                               scope=ldb.SCOPE_SUBTREE,
942                               expression="(|(ou=ou11)(ou=ou12))")
943         self.assertEqual(len(res11), 2)
944
945     def test_subtree_or2(self):
946         """Testing a search"""
947
948         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
949                               scope=ldb.SCOPE_SUBTREE,
950                               expression="(|(x=y)(y=b))")
951         self.assertEqual(len(res11), 20)
952
953     def test_subtree_or3(self):
954         """Testing a search"""
955
956         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
957                               scope=ldb.SCOPE_SUBTREE,
958                               expression="(|(x=y)(y=b)(y=c))")
959         self.assertEqual(len(res11), 22)
960
961     def test_one_and(self):
962         """Testing a search"""
963
964         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
965                               scope=ldb.SCOPE_ONELEVEL,
966                               expression="(&(ou=ou11)(ou=ou12))")
967         self.assertEqual(len(res11), 0)
968
969     def test_one_and2(self):
970         """Testing a search"""
971
972         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
973                               scope=ldb.SCOPE_ONELEVEL,
974                               expression="(&(x=y)(y=b))")
975         self.assertEqual(len(res11), 1)
976
977     def test_one_or(self):
978         """Testing a search"""
979
980         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
981                               scope=ldb.SCOPE_ONELEVEL,
982                               expression="(|(ou=ou11)(ou=ou12))")
983         self.assertEqual(len(res11), 2)
984
985     def test_one_or2(self):
986         """Testing a search"""
987
988         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
989                               scope=ldb.SCOPE_ONELEVEL,
990                               expression="(|(x=y)(y=b))")
991         self.assertEqual(len(res11), 20)
992
993     def test_one_or2_lower(self):
994         """Testing a search"""
995
996         res11 = self.l.search(base="DC=samba,DC=org",
997                               scope=ldb.SCOPE_ONELEVEL,
998                               expression="(|(x=y)(y=b))")
999         self.assertEqual(len(res11), 20)
1000
1001     def test_one_unindexable(self):
1002         """Testing a search"""
1003
1004         try:
1005             res11 = self.l.search(base="DC=samba,DC=org",
1006                                   scope=ldb.SCOPE_ONELEVEL,
1007                                   expression="(y=b*)")
1008             if hasattr(self, 'IDX') and \
1009                not hasattr(self, 'IDXONE') and \
1010                hasattr(self, 'IDXCHECK'):
1011                 self.fail("Should have failed as un-indexed search")
1012
1013             self.assertEqual(len(res11), 9)
1014
1015         except ldb.LdbError as err:
1016             enum = err.args[0]
1017             estr = err.args[1]
1018             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1019             self.assertIn(estr, "ldb FULL SEARCH disabled")
1020
1021     def test_one_unindexable_presence(self):
1022         """Testing a search"""
1023
1024         try:
1025             res11 = self.l.search(base="DC=samba,DC=org",
1026                                   scope=ldb.SCOPE_ONELEVEL,
1027                                   expression="(y=*)")
1028             if hasattr(self, 'IDX') and \
1029                not hasattr(self, 'IDXONE') and \
1030                hasattr(self, 'IDXCHECK'):
1031                 self.fail("Should have failed as un-indexed search")
1032
1033             self.assertEqual(len(res11), 24)
1034
1035         except ldb.LdbError as err:
1036             enum = err.args[0]
1037             estr = err.args[1]
1038             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1039             self.assertIn(estr, "ldb FULL SEARCH disabled")
1040
1041
1042     def test_subtree_and_or(self):
1043         """Testing a search"""
1044
1045         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1046                               scope=ldb.SCOPE_SUBTREE,
1047                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1048         self.assertEqual(len(res11), 0)
1049
1050     def test_subtree_and_or2(self):
1051         """Testing a search"""
1052
1053         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1054                               scope=ldb.SCOPE_SUBTREE,
1055                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1056         self.assertEqual(len(res11), 0)
1057
1058     def test_subtree_and_or3(self):
1059         """Testing a search"""
1060
1061         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1062                               scope=ldb.SCOPE_SUBTREE,
1063                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1064         self.assertEqual(len(res11), 2)
1065
1066     def test_subtree_and_or4(self):
1067         """Testing a search"""
1068
1069         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1070                               scope=ldb.SCOPE_SUBTREE,
1071                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1072         self.assertEqual(len(res11), 2)
1073
1074     def test_subtree_and_or5(self):
1075         """Testing a search"""
1076
1077         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1078                               scope=ldb.SCOPE_SUBTREE,
1079                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1080         self.assertEqual(len(res11), 1)
1081
1082     def test_subtree_or_and(self):
1083         """Testing a search"""
1084
1085         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1086                               scope=ldb.SCOPE_SUBTREE,
1087                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1088         self.assertEqual(len(res11), 10)
1089
1090     def test_subtree_large_and_unique(self):
1091         """Testing a search"""
1092
1093         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1094                               scope=ldb.SCOPE_SUBTREE,
1095                               expression="(&(ou=ou10)(y=a))")
1096         self.assertEqual(len(res11), 1)
1097
1098     def test_subtree_and_none(self):
1099         """Testing a search"""
1100
1101         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1102                               scope=ldb.SCOPE_SUBTREE,
1103                               expression="(&(ou=ouX)(y=a))")
1104         self.assertEqual(len(res11), 0)
1105
1106     def test_subtree_and_idx_record(self):
1107         """Testing a search against the index record"""
1108
1109         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1110                               scope=ldb.SCOPE_SUBTREE,
1111                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1112         self.assertEqual(len(res11), 0)
1113
1114     def test_subtree_and_idxone_record(self):
1115         """Testing a search against the index record"""
1116
1117         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1118                               scope=ldb.SCOPE_SUBTREE,
1119                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1120         self.assertEqual(len(res11), 0)
1121
1122     def test_subtree_unindexable(self):
1123         """Testing a search"""
1124
1125         try:
1126             res11 = self.l.search(base="DC=samba,DC=org",
1127                                   scope=ldb.SCOPE_SUBTREE,
1128                                   expression="(y=b*)")
1129             if hasattr(self, 'IDX') and \
1130                hasattr(self, 'IDXCHECK'):
1131                 self.fail("Should have failed as un-indexed search")
1132
1133             self.assertEqual(len(res11), 9)
1134
1135         except ldb.LdbError as err:
1136             enum = err.args[0]
1137             estr = err.args[1]
1138             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1139             self.assertIn(estr, "ldb FULL SEARCH disabled")
1140
1141     def test_subtree_unindexable_presence(self):
1142         """Testing a search"""
1143
1144         try:
1145             res11 = self.l.search(base="DC=samba,DC=org",
1146                                   scope=ldb.SCOPE_SUBTREE,
1147                                   expression="(y=*)")
1148             if hasattr(self, 'IDX') and \
1149                hasattr(self, 'IDXCHECK'):
1150                 self.fail("Should have failed as un-indexed search")
1151
1152             self.assertEqual(len(res11), 24)
1153
1154         except ldb.LdbError as err:
1155             enum = err.args[0]
1156             estr = err.args[1]
1157             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1158             self.assertIn(estr, "ldb FULL SEARCH disabled")
1159
1160
1161     def test_dn_filter_one(self):
1162         """Testing that a dn= filter succeeds
1163         (or fails with disallowDNFilter
1164         set and IDXGUID or (IDX and not IDXONE) mode)
1165         when the scope is SCOPE_ONELEVEL.
1166
1167         This should be made more consistent, but for now lock in
1168         the behaviour
1169
1170         """
1171
1172         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1173                               scope=ldb.SCOPE_ONELEVEL,
1174                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1175         if hasattr(self, 'disallowDNFilter') and \
1176            hasattr(self, 'IDX') and \
1177            (hasattr(self, 'IDXGUID') or \
1178             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1179             self.assertEqual(len(res11), 0)
1180         else:
1181             self.assertEqual(len(res11), 1)
1182
1183     def test_dn_filter_subtree(self):
1184         """Testing that a dn= filter succeeds
1185         (or fails with disallowDNFilter set)
1186         when the scope is SCOPE_SUBTREE"""
1187
1188         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1189                               scope=ldb.SCOPE_SUBTREE,
1190                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1191         if hasattr(self, 'disallowDNFilter') \
1192            and hasattr(self, 'IDX'):
1193             self.assertEqual(len(res11), 0)
1194         else:
1195             self.assertEqual(len(res11), 1)
1196
1197     def test_dn_filter_base(self):
1198         """Testing that (incorrectly) a dn= filter works
1199         when the scope is SCOPE_BASE"""
1200
1201         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1202                               scope=ldb.SCOPE_BASE,
1203                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1204
1205         # At some point we should fix this, but it isn't trivial
1206         self.assertEqual(len(res11), 1)
1207
1208     def test_distinguishedName_filter_one(self):
1209         """Testing that a distinguishedName= filter succeeds
1210         when the scope is SCOPE_ONELEVEL.
1211
1212         This should be made more consistent, but for now lock in
1213         the behaviour
1214
1215         """
1216
1217         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1218                               scope=ldb.SCOPE_ONELEVEL,
1219                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1220         self.assertEqual(len(res11), 1)
1221
1222     def test_distinguishedName_filter_subtree(self):
1223         """Testing that a distinguishedName= filter succeeds
1224         when the scope is SCOPE_SUBTREE"""
1225
1226         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1227                               scope=ldb.SCOPE_SUBTREE,
1228                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1229         self.assertEqual(len(res11), 1)
1230
1231     def test_distinguishedName_filter_base(self):
1232         """Testing that (incorrectly) a distinguishedName= filter works
1233         when the scope is SCOPE_BASE"""
1234
1235         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1236                               scope=ldb.SCOPE_BASE,
1237                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1238
1239         # At some point we should fix this, but it isn't trivial
1240         self.assertEqual(len(res11), 1)
1241
1242     def test_bad_dn_filter_base(self):
1243         """Testing that a dn= filter on an invalid DN works
1244         when the scope is SCOPE_BASE but
1245         returns zero results"""
1246
1247         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1248                               scope=ldb.SCOPE_BASE,
1249                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1250
1251         # At some point we should fix this, but it isn't trivial
1252         self.assertEqual(len(res11), 0)
1253
1254
1255     def test_bad_dn_filter_one(self):
1256         """Testing that a dn= filter succeeds but returns zero
1257         results when the DN is not valid on a SCOPE_ONELEVEL search
1258
1259         """
1260
1261         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1262                               scope=ldb.SCOPE_ONELEVEL,
1263                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1264         self.assertEqual(len(res11), 0)
1265
1266     def test_bad_dn_filter_subtree(self):
1267         """Testing that a dn= filter succeeds but returns zero
1268         results when the DN is not valid on a SCOPE_SUBTREE search
1269
1270         """
1271
1272         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1273                               scope=ldb.SCOPE_SUBTREE,
1274                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1275         self.assertEqual(len(res11), 0)
1276
1277     def test_bad_distinguishedName_filter_base(self):
1278         """Testing that a distinguishedName= filter on an invalid DN works
1279         when the scope is SCOPE_BASE but
1280         returns zero results"""
1281
1282         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1283                               scope=ldb.SCOPE_BASE,
1284                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1285
1286         # At some point we should fix this, but it isn't trivial
1287         self.assertEqual(len(res11), 0)
1288
1289
1290     def test_bad_distinguishedName_filter_one(self):
1291         """Testing that a distinguishedName= filter succeeds but returns zero
1292         results when the DN is not valid on a SCOPE_ONELEVEL search
1293
1294         """
1295
1296         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1297                               scope=ldb.SCOPE_ONELEVEL,
1298                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1299         self.assertEqual(len(res11), 0)
1300
1301     def test_bad_distinguishedName_filter_subtree(self):
1302         """Testing that a distinguishedName= filter succeeds but returns zero
1303         results when the DN is not valid on a SCOPE_SUBTREE search
1304
1305         """
1306
1307         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1308                               scope=ldb.SCOPE_SUBTREE,
1309                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1310         self.assertEqual(len(res11), 0)
1311
1312     def test_bad_dn_search_base(self):
1313         """Testing with a bad base DN (SCOPE_BASE)"""
1314
1315         try:
1316             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1317                                   scope=ldb.SCOPE_BASE)
1318             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1319         except ldb.LdbError as err:
1320             enum = err.args[0]
1321             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1322
1323
1324     def test_bad_dn_search_one(self):
1325         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1326
1327         try:
1328             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1329                               scope=ldb.SCOPE_ONELEVEL)
1330             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1331         except ldb.LdbError as err:
1332             enum = err.args[0]
1333             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1334
1335     def test_bad_dn_search_subtree(self):
1336         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1337
1338         try:
1339             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1340                                   scope=ldb.SCOPE_SUBTREE)
1341             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1342         except ldb.LdbError as err:
1343             enum = err.args[0]
1344             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1345
1346
1347
1348 # Run the search tests against an lmdb backend
1349 class SearchTestsLmdb(SearchTests):
1350
1351     def setUp(self):
1352         self.prefix = MDB_PREFIX
1353         self.index = MDB_INDEX_OBJ
1354         super(SearchTestsLmdb, self).setUp()
1355
1356     def tearDown(self):
1357         super(SearchTestsLmdb, self).tearDown()
1358
1359
1360 class IndexedSearchTests(SearchTests):
1361     """Test searches using the index, to ensure the index doesn't
1362        break things"""
1363
1364     def setUp(self):
1365         super(IndexedSearchTests, self).setUp()
1366         self.l.add({"dn": "@INDEXLIST",
1367                     "@IDXATTR": [b"x", b"y", b"ou"]})
1368         self.IDX = True
1369
1370 class IndexedCheckSearchTests(IndexedSearchTests):
1371     """Test searches using the index, to ensure the index doesn't
1372        break things (full scan disabled)"""
1373
1374     def setUp(self):
1375         self.IDXCHECK = True
1376         super(IndexedCheckSearchTests, self).setUp()
1377
1378 class IndexedSearchDnFilterTests(SearchTests):
1379     """Test searches using the index, to ensure the index doesn't
1380        break things"""
1381
1382     def setUp(self):
1383         super(IndexedSearchDnFilterTests, self).setUp()
1384         self.l.add({"dn": "@OPTIONS",
1385                     "disallowDNFilter": "TRUE"})
1386         self.disallowDNFilter = True
1387
1388         self.l.add({"dn": "@INDEXLIST",
1389                     "@IDXATTR": [b"x", b"y", b"ou"]})
1390         self.IDX = True
1391
1392 class IndexedAndOneLevelSearchTests(SearchTests):
1393     """Test searches using the index including @IDXONE, to ensure
1394        the index doesn't break things"""
1395
1396     def setUp(self):
1397         super(IndexedAndOneLevelSearchTests, self).setUp()
1398         self.l.add({"dn": "@INDEXLIST",
1399                     "@IDXATTR": [b"x", b"y", b"ou"],
1400                     "@IDXONE": [b"1"]})
1401         self.IDX = True
1402         self.IDXONE = True
1403
1404 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1405     """Test searches using the index including @IDXONE, to ensure
1406        the index doesn't break things (full scan disabled)"""
1407
1408     def setUp(self):
1409         self.IDXCHECK = True
1410         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1411
1412 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1413     """Test searches using the index including @IDXONE, to ensure
1414        the index doesn't break things"""
1415
1416     def setUp(self):
1417         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1418         self.l.add({"dn": "@OPTIONS",
1419                     "disallowDNFilter": "TRUE",
1420                     "checkBaseOnSearch": "TRUE"})
1421         self.disallowDNFilter = True
1422         self.checkBaseOnSearch = True
1423
1424         self.l.add({"dn": "@INDEXLIST",
1425                     "@IDXATTR": [b"x", b"y", b"ou"],
1426                     "@IDXONE": [b"1"]})
1427         self.IDX = True
1428         self.IDXONE = True
1429
1430 class GUIDIndexedSearchTests(SearchTests):
1431     """Test searches using the index, to ensure the index doesn't
1432        break things"""
1433
1434     def setUp(self):
1435         self.index = {"dn": "@INDEXLIST",
1436                       "@IDXATTR": [b"x", b"y", b"ou"],
1437                       "@IDXGUID": [b"objectUUID"],
1438                       "@IDX_DN_GUID": [b"GUID"]}
1439         super(GUIDIndexedSearchTests, self).setUp()
1440
1441         self.IDXGUID = True
1442         self.IDXONE = True
1443
1444
1445 class GUIDIndexedDNFilterSearchTests(SearchTests):
1446     """Test searches using the index, to ensure the index doesn't
1447        break things"""
1448
1449     def setUp(self):
1450         self.index = {"dn": "@INDEXLIST",
1451                       "@IDXATTR": [b"x", b"y", b"ou"],
1452                       "@IDXGUID": [b"objectUUID"],
1453                       "@IDX_DN_GUID": [b"GUID"]}
1454         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1455         self.l.add({"dn": "@OPTIONS",
1456                     "disallowDNFilter": "TRUE",
1457                     "checkBaseOnSearch": "TRUE"})
1458         self.disallowDNFilter = True
1459         self.checkBaseOnSearch = True
1460         self.IDX = True
1461         self.IDXGUID = True
1462
1463 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1464     """Test searches using the index including @IDXONE, to ensure
1465        the index doesn't break things"""
1466
1467     def setUp(self):
1468         self.index = {"dn": "@INDEXLIST",
1469                       "@IDXATTR": [b"x", b"y", b"ou"],
1470                       "@IDXGUID": [b"objectUUID"],
1471                       "@IDX_DN_GUID": [b"GUID"]}
1472         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1473         self.l.add({"dn": "@OPTIONS",
1474                     "disallowDNFilter": "TRUE",
1475                     "checkBaseOnSearch": "TRUE"})
1476         self.disallowDNFilter = True
1477         self.checkBaseOnSearch = True
1478         self.IDX = True
1479         self.IDXGUID = True
1480         self.IDXONE = True
1481
1482 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1483
1484     def setUp(self):
1485         self.prefix = MDB_PREFIX
1486         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1487
1488     def tearDown(self):
1489         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1490
1491
1492 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1493
1494     def setUp(self):
1495         self.prefix = MDB_PREFIX
1496         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1497
1498     def tearDown(self):
1499         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1500
1501
1502 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1503
1504     def setUp(self):
1505         self.prefix = MDB_PREFIX
1506         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1507
1508     def tearDown(self):
1509         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1510
1511
1512 class AddModifyTests(LdbBaseTest):
1513     def tearDown(self):
1514         shutil.rmtree(self.testdir)
1515         super(AddModifyTests, self).tearDown()
1516
1517         # Ensure the LDB is closed now, so we close the FD
1518         del(self.l)
1519
1520     def setUp(self):
1521         super(AddModifyTests, self).setUp()
1522         self.testdir = tempdir()
1523         self.filename = os.path.join(self.testdir, "add_test.ldb")
1524         self.l = ldb.Ldb(self.url(),
1525                          flags=self.flags(),
1526                          options=["modules:rdn_name"])
1527         try:
1528             self.l.add(self.index)
1529         except AttributeError:
1530             pass
1531
1532         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1533                     "name": b"samba.org",
1534                     "objectUUID": b"0123456789abcdef"})
1535         self.l.add({"dn": "@ATTRIBUTES",
1536                     "objectUUID": "UNIQUE_INDEX"})
1537
1538     def test_add_dup(self):
1539         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1540                     "name": b"Admins",
1541                     "x": "z", "y": "a",
1542                     "objectUUID": b"0123456789abcde1"})
1543         try:
1544             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1545                         "name": b"Admins",
1546                         "x": "z", "y": "a",
1547                         "objectUUID": b"0123456789abcde2"})
1548             self.fail("Should have failed adding dupliate entry")
1549         except ldb.LdbError as err:
1550             enum = err.args[0]
1551             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1552
1553     def test_add_bad(self):
1554         try:
1555             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1556                         "name": b"Admins",
1557                         "x": "z", "y": "a",
1558                         "objectUUID": b"0123456789abcde1"})
1559             self.fail("Should have failed adding entry with invalid DN")
1560         except ldb.LdbError as err:
1561             enum = err.args[0]
1562             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1563
1564     def test_add_del_add(self):
1565         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1566                     "name": b"Admins",
1567                     "x": "z", "y": "a",
1568                     "objectUUID": b"0123456789abcde1"})
1569         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1570         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1571                     "name": b"Admins",
1572                     "x": "z", "y": "a",
1573                     "objectUUID": b"0123456789abcde2"})
1574
1575     def test_add_move_add(self):
1576         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1577                     "name": b"Admins",
1578                     "x": "z", "y": "a",
1579                     "objectUUID": b"0123456789abcde1"})
1580         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1581                       "OU=DUP2,DC=SAMBA,DC=ORG")
1582         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1583                     "name": b"Admins",
1584                     "x": "z", "y": "a",
1585                     "objectUUID": b"0123456789abcde2"})
1586
1587     def test_add_move_fail_move_move(self):
1588         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1589                     "name": b"Admins",
1590                     "x": "z", "y": "a",
1591                     "objectUUID": b"0123456789abcde1"})
1592         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1593                     "name": b"Admins",
1594                     "x": "z", "y": "a",
1595                     "objectUUID": b"0123456789abcde2"})
1596
1597         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1598                              scope=ldb.SCOPE_SUBTREE,
1599                              expression="(objectUUID=0123456789abcde1)")
1600         self.assertEqual(len(res2), 1)
1601         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1602
1603         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1604                              scope=ldb.SCOPE_SUBTREE,
1605                              expression="(objectUUID=0123456789abcde2)")
1606         self.assertEqual(len(res3), 1)
1607         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1608
1609         try:
1610             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1611                           "OU=DUP2,DC=SAMBA,DC=ORG")
1612             self.fail("Should have failed on duplicate DN")
1613         except ldb.LdbError as err:
1614             enum = err.args[0]
1615             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1616
1617         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1618                       "OU=DUP3,DC=SAMBA,DC=ORG")
1619
1620         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1621                       "OU=DUP2,DC=SAMBA,DC=ORG")
1622
1623         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1624                              scope=ldb.SCOPE_SUBTREE,
1625                              expression="(objectUUID=0123456789abcde1)")
1626         self.assertEqual(len(res2), 1)
1627         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1628
1629         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1630                              scope=ldb.SCOPE_SUBTREE,
1631                              expression="(objectUUID=0123456789abcde2)")
1632         self.assertEqual(len(res3), 1)
1633         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1634
1635     def test_move_missing(self):
1636         try:
1637             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1638                           "OU=DUP2,DC=SAMBA,DC=ORG")
1639             self.fail("Should have failed on missing")
1640         except ldb.LdbError as err:
1641             enum = err.args[0]
1642             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1643
1644     def test_move_missing2(self):
1645         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1646                     "name": b"Admins",
1647                     "x": "z", "y": "a",
1648                     "objectUUID": b"0123456789abcde2"})
1649
1650         try:
1651             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1652                           "OU=DUP2,DC=SAMBA,DC=ORG")
1653             self.fail("Should have failed on missing")
1654         except ldb.LdbError as err:
1655             enum = err.args[0]
1656             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1657
1658     def test_move_bad(self):
1659         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1660                     "name": b"Admins",
1661                     "x": "z", "y": "a",
1662                     "objectUUID": b"0123456789abcde2"})
1663
1664         try:
1665             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1666                           "OU=DUP2,DC=SAMBA,DC=ORG")
1667             self.fail("Should have failed on invalid DN")
1668         except ldb.LdbError as err:
1669             enum = err.args[0]
1670             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1671
1672     def test_move_bad2(self):
1673         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1674                     "name": b"Admins",
1675                     "x": "z", "y": "a",
1676                     "objectUUID": b"0123456789abcde2"})
1677
1678         try:
1679             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1680                           "OUXDUP2,DC=SAMBA,DC=ORG")
1681             self.fail("Should have failed on missing")
1682         except ldb.LdbError as err:
1683             enum = err.args[0]
1684             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1685
1686     def test_move_fail_move_add(self):
1687         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1688                     "name": b"Admins",
1689                     "x": "z", "y": "a",
1690                     "objectUUID": b"0123456789abcde1"})
1691         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1692                     "name": b"Admins",
1693                     "x": "z", "y": "a",
1694                     "objectUUID": b"0123456789abcde2"})
1695         try:
1696             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1697                           "OU=DUP2,DC=SAMBA,DC=ORG")
1698             self.fail("Should have failed on duplicate DN")
1699         except ldb.LdbError as err:
1700             enum = err.args[0]
1701             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1702
1703         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1704                       "OU=DUP3,DC=SAMBA,DC=ORG")
1705
1706         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1707                     "name": b"Admins",
1708                     "x": "z", "y": "a",
1709                     "objectUUID": b"0123456789abcde3"})
1710
1711
1712 class AddModifyTestsLmdb(AddModifyTests):
1713
1714     def setUp(self):
1715         self.prefix = MDB_PREFIX
1716         self.index = MDB_INDEX_OBJ
1717         super(AddModifyTestsLmdb, self).setUp()
1718
1719     def tearDown(self):
1720         super(AddModifyTestsLmdb, self).tearDown()
1721
1722 class IndexedAddModifyTests(AddModifyTests):
1723     """Test searches using the index, to ensure the index doesn't
1724        break things"""
1725
1726     def setUp(self):
1727         if not hasattr(self, 'index'):
1728             self.index = {"dn": "@INDEXLIST",
1729                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1730                           "@IDXONE": [b"1"]}
1731         super(IndexedAddModifyTests, self).setUp()
1732
1733     def test_duplicate_GUID(self):
1734         try:
1735             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1736                         "name": b"Admins",
1737                         "x": "z", "y": "a",
1738                         "objectUUID": b"0123456789abcdef"})
1739             self.fail("Should have failed adding dupliate GUID")
1740         except ldb.LdbError as err:
1741             enum = err.args[0]
1742             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1743
1744     def test_duplicate_name_dup_GUID(self):
1745         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1746                     "name": b"Admins",
1747                     "x": "z", "y": "a",
1748                     "objectUUID": b"a123456789abcdef"})
1749         try:
1750             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1751                         "name": b"Admins",
1752                         "x": "z", "y": "a",
1753                         "objectUUID": b"a123456789abcdef"})
1754             self.fail("Should have failed adding dupliate GUID")
1755         except ldb.LdbError as err:
1756             enum = err.args[0]
1757             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1758
1759     def test_duplicate_name_dup_GUID2(self):
1760         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1761                     "name": b"Admins",
1762                     "x": "z", "y": "a",
1763                     "objectUUID": b"abc3456789abcdef"})
1764         try:
1765             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1766                         "name": b"Admins",
1767                         "x": "z", "y": "a",
1768                         "objectUUID": b"aaa3456789abcdef"})
1769             self.fail("Should have failed adding dupliate DN")
1770         except ldb.LdbError as err:
1771             enum = err.args[0]
1772             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1773
1774         # Checking the GUID didn't stick in the index
1775         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1776                     "name": b"Admins",
1777                     "x": "z", "y": "a",
1778                     "objectUUID": b"aaa3456789abcdef"})
1779
1780     def test_add_dup_guid_add(self):
1781         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1782                     "name": b"Admins",
1783                     "x": "z", "y": "a",
1784                     "objectUUID": b"0123456789abcde1"})
1785         try:
1786             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1787                         "name": b"Admins",
1788                         "x": "z", "y": "a",
1789                         "objectUUID": b"0123456789abcde1"})
1790             self.fail("Should have failed on duplicate GUID")
1791
1792         except ldb.LdbError as err:
1793             enum = err.args[0]
1794             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1795
1796         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1797                     "name": b"Admins",
1798                     "x": "z", "y": "a",
1799                     "objectUUID": b"0123456789abcde2"})
1800
1801 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1802     """Test searches using the index, to ensure the index doesn't
1803        break things"""
1804
1805     def setUp(self):
1806         self.index = {"dn": "@INDEXLIST",
1807                       "@IDXATTR": [b"x", b"y", b"ou"],
1808                       "@IDXONE": [b"1"],
1809                       "@IDXGUID": [b"objectUUID"],
1810                       "@IDX_DN_GUID": [b"GUID"]}
1811         super(GUIDIndexedAddModifyTests, self).setUp()
1812
1813
1814 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1815     """Test GUID index behaviour insdie the transaction"""
1816
1817     def setUp(self):
1818         super(GUIDTransIndexedAddModifyTests, self).setUp()
1819         self.l.transaction_start()
1820
1821     def tearDown(self):
1822         self.l.transaction_commit()
1823         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1824
1825 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1826     """Test index behaviour insdie the transaction"""
1827
1828     def setUp(self):
1829         super(TransIndexedAddModifyTests, self).setUp()
1830         self.l.transaction_start()
1831
1832     def tearDown(self):
1833         self.l.transaction_commit()
1834         super(TransIndexedAddModifyTests, self).tearDown()
1835
1836 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1837
1838     def setUp(self):
1839         self.prefix = MDB_PREFIX
1840         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1841
1842     def tearDown(self):
1843         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1844
1845 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1846
1847     def setUp(self):
1848         self.prefix = MDB_PREFIX
1849         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1850
1851     def tearDown(self):
1852         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1853
1854 class BadIndexTests(LdbBaseTest):
1855     def setUp(self):
1856         super(BadIndexTests, self).setUp()
1857         self.testdir = tempdir()
1858         self.filename = os.path.join(self.testdir, "test.ldb")
1859         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1860         if hasattr(self, 'IDXGUID'):
1861             self.ldb.add({"dn": "@INDEXLIST",
1862                           "@IDXATTR": [b"x", b"y", b"ou"],
1863                           "@IDXGUID": [b"objectUUID"],
1864                           "@IDX_DN_GUID": [b"GUID"]})
1865         else:
1866             self.ldb.add({"dn": "@INDEXLIST",
1867                           "@IDXATTR": [b"x", b"y", b"ou"]})
1868
1869         super(BadIndexTests, self).setUp()
1870
1871     def test_unique(self):
1872         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1873                       "objectUUID": b"0123456789abcde1",
1874                       "y": "1"})
1875         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1876                       "objectUUID": b"0123456789abcde2",
1877                       "y": "1"})
1878         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1879                       "objectUUID": b"0123456789abcde3",
1880                       "y": "1"})
1881
1882         res = self.ldb.search(expression="(y=1)",
1883                               base="dc=samba,dc=org")
1884         self.assertEquals(len(res), 3)
1885
1886         # Now set this to unique index, but forget to check the result
1887         try:
1888             self.ldb.add({"dn": "@ATTRIBUTES",
1889                           "y": "UNIQUE_INDEX"})
1890             self.fail()
1891         except ldb.LdbError:
1892             pass
1893
1894         # We must still have a working index
1895         res = self.ldb.search(expression="(y=1)",
1896                               base="dc=samba,dc=org")
1897         self.assertEquals(len(res), 3)
1898
1899     def test_unique_transaction(self):
1900         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1901                       "objectUUID": b"0123456789abcde1",
1902                       "y": "1"})
1903         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1904                       "objectUUID": b"0123456789abcde2",
1905                       "y": "1"})
1906         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1907                       "objectUUID": b"0123456789abcde3",
1908                       "y": "1"})
1909
1910         res = self.ldb.search(expression="(y=1)",
1911                               base="dc=samba,dc=org")
1912         self.assertEquals(len(res), 3)
1913
1914         self.ldb.transaction_start()
1915
1916         # Now set this to unique index, but forget to check the result
1917         try:
1918             self.ldb.add({"dn": "@ATTRIBUTES",
1919                           "y": "UNIQUE_INDEX"})
1920         except ldb.LdbError:
1921             pass
1922
1923         try:
1924             self.ldb.transaction_commit()
1925             self.fail()
1926
1927         except ldb.LdbError as err:
1928             enum = err.args[0]
1929             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1930
1931         # We must still have a working index
1932         res = self.ldb.search(expression="(y=1)",
1933                               base="dc=samba,dc=org")
1934
1935         self.assertEquals(len(res), 3)
1936
1937     def test_casefold(self):
1938         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1939                       "objectUUID": b"0123456789abcde1",
1940                       "y": "a"})
1941         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1942                       "objectUUID": b"0123456789abcde2",
1943                       "y": "A"})
1944         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1945                       "objectUUID": b"0123456789abcde3",
1946                       "y": ["a", "A"]})
1947
1948         res = self.ldb.search(expression="(y=a)",
1949                               base="dc=samba,dc=org")
1950         self.assertEquals(len(res), 2)
1951
1952         self.ldb.add({"dn": "@ATTRIBUTES",
1953                       "y": "CASE_INSENSITIVE"})
1954
1955         # We must still have a working index
1956         res = self.ldb.search(expression="(y=a)",
1957                               base="dc=samba,dc=org")
1958
1959         if hasattr(self, 'IDXGUID'):
1960             self.assertEquals(len(res), 3)
1961         else:
1962             # We should not return this entry twice, but sadly
1963             # we have not yet fixed
1964             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1965             self.assertEquals(len(res), 4)
1966
1967     def test_casefold_transaction(self):
1968         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1969                       "objectUUID": b"0123456789abcde1",
1970                       "y": "a"})
1971         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1972                       "objectUUID": b"0123456789abcde2",
1973                       "y": "A"})
1974         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1975                       "objectUUID": b"0123456789abcde3",
1976                       "y": ["a", "A"]})
1977
1978         res = self.ldb.search(expression="(y=a)",
1979                               base="dc=samba,dc=org")
1980         self.assertEquals(len(res), 2)
1981
1982         self.ldb.transaction_start()
1983
1984         self.ldb.add({"dn": "@ATTRIBUTES",
1985                       "y": "CASE_INSENSITIVE"})
1986
1987         self.ldb.transaction_commit()
1988
1989         # We must still have a working index
1990         res = self.ldb.search(expression="(y=a)",
1991                               base="dc=samba,dc=org")
1992
1993         if hasattr(self, 'IDXGUID'):
1994             self.assertEquals(len(res), 3)
1995         else:
1996             # We should not return this entry twice, but sadly
1997             # we have not yet fixed
1998             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1999             self.assertEquals(len(res), 4)
2000
2001
2002     def tearDown(self):
2003         super(BadIndexTests, self).tearDown()
2004
2005
2006 class GUIDBadIndexTests(BadIndexTests):
2007     """Test Bad index things with GUID index mode"""
2008
2009     def setUp(self):
2010         self.IDXGUID = True
2011
2012         super(GUIDBadIndexTests, self).setUp()
2013
2014 class DnTests(TestCase):
2015
2016     def setUp(self):
2017         super(DnTests, self).setUp()
2018         self.ldb = ldb.Ldb()
2019
2020     def tearDown(self):
2021         super(DnTests, self).tearDown()
2022         del(self.ldb)
2023
2024     def test_set_dn_invalid(self):
2025         x = ldb.Message()
2026         def assign():
2027             x.dn = "astring"
2028         self.assertRaises(TypeError, assign)
2029
2030     def test_eq(self):
2031         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2032         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2033         self.assertEqual(x, y)
2034         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2035         self.assertNotEqual(x, y)
2036
2037     def test_str(self):
2038         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2039         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2040
2041     def test_repr(self):
2042         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2043         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2044
2045     def test_get_casefold(self):
2046         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2047         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2048
2049     def test_validate(self):
2050         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2051         self.assertTrue(x.validate())
2052
2053     def test_parent(self):
2054         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2055         self.assertEqual("bar=bloe", x.parent().__str__())
2056
2057     def test_parent_nonexistent(self):
2058         x = ldb.Dn(self.ldb, "@BLA")
2059         self.assertEqual(None, x.parent())
2060
2061     def test_is_valid(self):
2062         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2063         self.assertTrue(x.is_valid())
2064         x = ldb.Dn(self.ldb, "")
2065         self.assertTrue(x.is_valid())
2066
2067     def test_is_special(self):
2068         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2069         self.assertFalse(x.is_special())
2070         x = ldb.Dn(self.ldb, "@FOOBAR")
2071         self.assertTrue(x.is_special())
2072
2073     def test_check_special(self):
2074         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2075         self.assertFalse(x.check_special("FOOBAR"))
2076         x = ldb.Dn(self.ldb, "@FOOBAR")
2077         self.assertTrue(x.check_special("@FOOBAR"))
2078
2079     def test_len(self):
2080         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2081         self.assertEqual(2, len(x))
2082         x = ldb.Dn(self.ldb, "dc=foo21")
2083         self.assertEqual(1, len(x))
2084
2085     def test_add_child(self):
2086         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2087         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2088         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2089
2090     def test_add_base(self):
2091         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2092         base = ldb.Dn(self.ldb, "bla=bloe")
2093         self.assertTrue(x.add_base(base))
2094         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2095
2096     def test_add_child_str(self):
2097         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2098         self.assertTrue(x.add_child("bla=bloe"))
2099         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2100
2101     def test_add_base_str(self):
2102         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2103         base = "bla=bloe"
2104         self.assertTrue(x.add_base(base))
2105         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2106
2107     def test_add(self):
2108         x = ldb.Dn(self.ldb, "dc=foo24")
2109         y = ldb.Dn(self.ldb, "bar=bla")
2110         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2111
2112     def test_remove_base_components(self):
2113         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2114         x.remove_base_components(len(x) - 1)
2115         self.assertEqual("dc=foo24", str(x))
2116
2117     def test_parse_ldif(self):
2118         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2119         msg = next(msgs)
2120         self.assertEqual("foo=bar", str(msg[1].dn))
2121         self.assertTrue(isinstance(msg[1], ldb.Message))
2122         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2123         self.assertEqual("dn: foo=bar\n\n", ldif)
2124
2125     def test_parse_ldif_more(self):
2126         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2127         msg = next(msgs)
2128         self.assertEqual("foo=bar", str(msg[1].dn))
2129         msg = next(msgs)
2130         self.assertEqual("bar=bar", str(msg[1].dn))
2131
2132     def test_canonical_string(self):
2133         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2134         self.assertEqual("/bloe/foo25", x.canonical_str())
2135
2136     def test_canonical_ex_string(self):
2137         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2138         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2139
2140     def test_ldb_is_child_of(self):
2141         """Testing ldb_dn_compare_dn"""
2142         dn1 = ldb.Dn(self.ldb, "dc=base")
2143         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2144         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2145         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2146
2147         self.assertTrue(dn1.is_child_of(dn1))
2148         self.assertTrue(dn2.is_child_of(dn1))
2149         self.assertTrue(dn4.is_child_of(dn1))
2150         self.assertTrue(dn4.is_child_of(dn3))
2151         self.assertTrue(dn4.is_child_of(dn4))
2152         self.assertFalse(dn3.is_child_of(dn2))
2153         self.assertFalse(dn1.is_child_of(dn4))
2154
2155     def test_ldb_is_child_of_str(self):
2156         """Testing ldb_dn_compare_dn"""
2157         dn1_str = "dc=base"
2158         dn2_str = "cn=foo,dc=base"
2159         dn3_str = "cn=bar,dc=base"
2160         dn4_str = "cn=baz,cn=bar,dc=base"
2161
2162         dn1 = ldb.Dn(self.ldb, dn1_str)
2163         dn2 = ldb.Dn(self.ldb, dn2_str)
2164         dn3 = ldb.Dn(self.ldb, dn3_str)
2165         dn4 = ldb.Dn(self.ldb, dn4_str)
2166
2167         self.assertTrue(dn1.is_child_of(dn1_str))
2168         self.assertTrue(dn2.is_child_of(dn1_str))
2169         self.assertTrue(dn4.is_child_of(dn1_str))
2170         self.assertTrue(dn4.is_child_of(dn3_str))
2171         self.assertTrue(dn4.is_child_of(dn4_str))
2172         self.assertFalse(dn3.is_child_of(dn2_str))
2173         self.assertFalse(dn1.is_child_of(dn4_str))
2174
2175     def test_get_component_name(self):
2176         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2177         self.assertEqual(dn.get_component_name(0), 'cn')
2178         self.assertEqual(dn.get_component_name(1), 'dc')
2179         self.assertEqual(dn.get_component_name(2), None)
2180         self.assertEqual(dn.get_component_name(-1), None)
2181
2182     def test_get_component_value(self):
2183         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2184         self.assertEqual(dn.get_component_value(0), 'foo')
2185         self.assertEqual(dn.get_component_value(1), 'base')
2186         self.assertEqual(dn.get_component_name(2), None)
2187         self.assertEqual(dn.get_component_name(-1), None)
2188
2189     def test_set_component(self):
2190         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2191         dn.set_component(0, 'cn', 'bar')
2192         self.assertEqual(str(dn), "cn=bar,dc=base")
2193         dn.set_component(1, 'o', 'asep')
2194         self.assertEqual(str(dn), "cn=bar,o=asep")
2195         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2196         self.assertEqual(str(dn), "cn=bar,o=asep")
2197         dn.set_component(1, 'o', 'a,b+c')
2198         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2199
2200     def test_set_component_bytes(self):
2201         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2202         dn.set_component(0, 'cn', b'bar')
2203         self.assertEqual(str(dn), "cn=bar,dc=base")
2204         dn.set_component(1, 'o', b'asep')
2205         self.assertEqual(str(dn), "cn=bar,o=asep")
2206
2207     def test_set_component_none(self):
2208         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2209         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2210
2211     def test_get_extended_component_null(self):
2212         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2213         self.assertEqual(dn.get_extended_component("TEST"), None)
2214
2215     def test_get_extended_component(self):
2216         self.ldb._register_test_extensions()
2217         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2218         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2219
2220     def test_set_extended_component(self):
2221         self.ldb._register_test_extensions()
2222         dn = ldb.Dn(self.ldb, "dc=base")
2223         dn.set_extended_component("TEST", "foo")
2224         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2225         dn.set_extended_component("TEST", b"bar")
2226         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2227
2228     def test_extended_str(self):
2229         self.ldb._register_test_extensions()
2230         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2231         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2232
2233     def test_get_rdn_name(self):
2234         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2235         self.assertEqual(dn.get_rdn_name(), 'cn')
2236
2237     def test_get_rdn_value(self):
2238         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2239         self.assertEqual(dn.get_rdn_value(), 'foo')
2240
2241     def test_get_casefold(self):
2242         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2243         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2244
2245     def test_get_linearized(self):
2246         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2247         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2248
2249     def test_is_null(self):
2250         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2251         self.assertFalse(dn.is_null())
2252
2253         dn = ldb.Dn(self.ldb, '')
2254         self.assertTrue(dn.is_null())
2255
2256 class LdbMsgTests(TestCase):
2257
2258     def setUp(self):
2259         super(LdbMsgTests, self).setUp()
2260         self.msg = ldb.Message()
2261
2262     def test_init_dn(self):
2263         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2264         self.assertEqual("dc=foo27", str(self.msg.dn))
2265
2266     def test_iter_items(self):
2267         self.assertEqual(0, len(self.msg.items()))
2268         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2269         self.assertEqual(1, len(self.msg.items()))
2270
2271     def test_repr(self):
2272         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2273         self.msg["dc"] = b"foo"
2274         if PY3:
2275             self.assertIn(repr(self.msg), [
2276                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2277                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2278             ])
2279             self.assertIn(repr(self.msg.text), [
2280                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2281                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2282             ])
2283         else:
2284             self.assertEqual(
2285                 repr(self.msg),
2286                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
2287             self.assertEqual(
2288                 repr(self.msg.text),
2289                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
2290
2291     def test_len(self):
2292         self.assertEqual(0, len(self.msg))
2293
2294     def test_notpresent(self):
2295         self.assertRaises(KeyError, lambda: self.msg["foo"])
2296
2297     def test_del(self):
2298         del self.msg["foo"]
2299
2300     def test_add(self):
2301         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2302
2303     def test_add_text(self):
2304         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2305
2306     def test_elements_empty(self):
2307         self.assertEqual([], self.msg.elements())
2308
2309     def test_elements(self):
2310         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2311         self.msg.add(el)
2312         self.assertEqual([el], self.msg.elements())
2313         self.assertEqual([el.text], self.msg.text.elements())
2314
2315     def test_add_value(self):
2316         self.assertEqual(0, len(self.msg))
2317         self.msg["foo"] = [b"foo"]
2318         self.assertEqual(1, len(self.msg))
2319
2320     def test_add_value_text(self):
2321         self.assertEqual(0, len(self.msg))
2322         self.msg["foo"] = ["foo"]
2323         self.assertEqual(1, len(self.msg))
2324
2325     def test_add_value_multiple(self):
2326         self.assertEqual(0, len(self.msg))
2327         self.msg["foo"] = [b"foo", b"bla"]
2328         self.assertEqual(1, len(self.msg))
2329         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2330
2331     def test_add_value_multiple_text(self):
2332         self.assertEqual(0, len(self.msg))
2333         self.msg["foo"] = ["foo", "bla"]
2334         self.assertEqual(1, len(self.msg))
2335         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2336
2337     def test_set_value(self):
2338         self.msg["foo"] = [b"fool"]
2339         self.assertEqual([b"fool"], list(self.msg["foo"]))
2340         self.msg["foo"] = [b"bar"]
2341         self.assertEqual([b"bar"], list(self.msg["foo"]))
2342
2343     def test_set_value_text(self):
2344         self.msg["foo"] = ["fool"]
2345         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2346         self.msg["foo"] = ["bar"]
2347         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2348
2349     def test_keys(self):
2350         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2351         self.msg["foo"] = [b"bla"]
2352         self.msg["bar"] = [b"bla"]
2353         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2354
2355     def test_keys_text(self):
2356         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2357         self.msg["foo"] = ["bla"]
2358         self.msg["bar"] = ["bla"]
2359         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2360
2361     def test_dn(self):
2362         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2363         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2364
2365     def test_get_dn(self):
2366         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2367         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2368
2369     def test_dn_text(self):
2370         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2371         self.assertEqual("@BASEINFO", str(self.msg.dn))
2372         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2373
2374     def test_get_dn_text(self):
2375         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2376         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2377         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2378
2379     def test_get_invalid(self):
2380         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2381         self.assertRaises(TypeError, self.msg.get, 42)
2382
2383     def test_get_other(self):
2384         self.msg["foo"] = [b"bar"]
2385         self.assertEqual(b"bar", self.msg.get("foo")[0])
2386         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2387         self.assertEqual(None, self.msg.get("foo", idx=1))
2388         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2389
2390     def test_get_other_text(self):
2391         self.msg["foo"] = ["bar"]
2392         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2393         self.assertEqual("bar", self.msg.text.get("foo")[0])
2394         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2395         self.assertEqual(None, self.msg.get("foo", idx=1))
2396         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2397
2398     def test_get_default(self):
2399         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2400         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2401
2402     def test_get_default_text(self):
2403         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2404         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2405
2406     def test_get_unknown(self):
2407         self.assertEqual(None, self.msg.get("lalalala"))
2408
2409     def test_get_unknown_text(self):
2410         self.assertEqual(None, self.msg.text.get("lalalala"))
2411
2412     def test_msg_diff(self):
2413         l = ldb.Ldb()
2414         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2415         msg1 = next(msgs)[1]
2416         msg2 = next(msgs)[1]
2417         msgdiff = l.msg_diff(msg1, msg2)
2418         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2419         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2420         self.assertEqual(1, len(msgdiff))
2421
2422     def test_equal_empty(self):
2423         msg1 = ldb.Message()
2424         msg2 = ldb.Message()
2425         self.assertEqual(msg1, msg2)
2426
2427     def test_equal_simplel(self):
2428         db = ldb.Ldb()
2429         msg1 = ldb.Message()
2430         msg1.dn = ldb.Dn(db, "foo=bar")
2431         msg2 = ldb.Message()
2432         msg2.dn = ldb.Dn(db, "foo=bar")
2433         self.assertEqual(msg1, msg2)
2434         msg1['foo'] = b'bar'
2435         msg2['foo'] = b'bar'
2436         self.assertEqual(msg1, msg2)
2437         msg2['foo'] = b'blie'
2438         self.assertNotEqual(msg1, msg2)
2439         msg2['foo'] = b'blie'
2440
2441     def test_from_dict(self):
2442         rec = {"dn": "dc=fromdict",
2443                "a1": [b"a1-val1", b"a1-val1"]}
2444         l = ldb.Ldb()
2445         # check different types of input Flags
2446         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2447             m = ldb.Message.from_dict(l, rec, flags)
2448             self.assertEqual(rec["a1"], list(m["a1"]))
2449             self.assertEqual(flags, m["a1"].flags())
2450         # check input params
2451         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2452         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2453         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2454         # Message.from_dict expects dictionary with 'dn'
2455         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2456         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2457
2458     def test_from_dict_text(self):
2459         rec = {"dn": "dc=fromdict",
2460                "a1": ["a1-val1", "a1-val1"]}
2461         l = ldb.Ldb()
2462         # check different types of input Flags
2463         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2464             m = ldb.Message.from_dict(l, rec, flags)
2465             self.assertEqual(rec["a1"], list(m.text["a1"]))
2466             self.assertEqual(flags, m.text["a1"].flags())
2467         # check input params
2468         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2469         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2470         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2471         # Message.from_dict expects dictionary with 'dn'
2472         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2473         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2474
2475     def test_copy_add_message_element(self):
2476         m = ldb.Message()
2477         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2478         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2479         mto = ldb.Message()
2480         mto["1"] = m["1"]
2481         mto["2"] = m["2"]
2482         self.assertEqual(mto["1"], m["1"])
2483         self.assertEqual(mto["2"], m["2"])
2484         mto = ldb.Message()
2485         mto.add(m["1"])
2486         mto.add(m["2"])
2487         self.assertEqual(mto["1"], m["1"])
2488         self.assertEqual(mto["2"], m["2"])
2489
2490     def test_copy_add_message_element_text(self):
2491         m = ldb.Message()
2492         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2493         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2494         mto = ldb.Message()
2495         mto["1"] = m["1"]
2496         mto["2"] = m["2"]
2497         self.assertEqual(mto["1"], m.text["1"])
2498         self.assertEqual(mto["2"], m.text["2"])
2499         mto = ldb.Message()
2500         mto.add(m["1"])
2501         mto.add(m["2"])
2502         self.assertEqual(mto.text["1"], m.text["1"])
2503         self.assertEqual(mto.text["2"], m.text["2"])
2504         self.assertEqual(mto["1"], m["1"])
2505         self.assertEqual(mto["2"], m["2"])
2506
2507
2508 class MessageElementTests(TestCase):
2509
2510     def test_cmp_element(self):
2511         x = ldb.MessageElement([b"foo"])
2512         y = ldb.MessageElement([b"foo"])
2513         z = ldb.MessageElement([b"bzr"])
2514         self.assertEqual(x, y)
2515         self.assertNotEqual(x, z)
2516
2517     def test_cmp_element_text(self):
2518         x = ldb.MessageElement([b"foo"])
2519         y = ldb.MessageElement(["foo"])
2520         self.assertEqual(x, y)
2521
2522     def test_create_iterable(self):
2523         x = ldb.MessageElement([b"foo"])
2524         self.assertEqual([b"foo"], list(x))
2525         self.assertEqual(["foo"], list(x.text))
2526
2527     def test_repr(self):
2528         x = ldb.MessageElement([b"foo"])
2529         if PY3:
2530             self.assertEqual("MessageElement([b'foo'])", repr(x))
2531             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2532         else:
2533             self.assertEqual("MessageElement(['foo'])", repr(x))
2534             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2535         x = ldb.MessageElement([b"foo", b"bla"])
2536         self.assertEqual(2, len(x))
2537         if PY3:
2538             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2539             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2540         else:
2541             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2542             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2543
2544     def test_get_item(self):
2545         x = ldb.MessageElement([b"foo", b"bar"])
2546         self.assertEqual(b"foo", x[0])
2547         self.assertEqual(b"bar", x[1])
2548         self.assertEqual(b"bar", x[-1])
2549         self.assertRaises(IndexError, lambda: x[45])
2550
2551     def test_get_item_text(self):
2552         x = ldb.MessageElement(["foo", "bar"])
2553         self.assertEqual("foo", x.text[0])
2554         self.assertEqual("bar", x.text[1])
2555         self.assertEqual("bar", x.text[-1])
2556         self.assertRaises(IndexError, lambda: x[45])
2557
2558     def test_len(self):
2559         x = ldb.MessageElement([b"foo", b"bar"])
2560         self.assertEqual(2, len(x))
2561
2562     def test_eq(self):
2563         x = ldb.MessageElement([b"foo", b"bar"])
2564         y = ldb.MessageElement([b"foo", b"bar"])
2565         self.assertEqual(y, x)
2566         x = ldb.MessageElement([b"foo"])
2567         self.assertNotEqual(y, x)
2568         y = ldb.MessageElement([b"foo"])
2569         self.assertEqual(y, x)
2570
2571     def test_extended(self):
2572         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2573         if PY3:
2574             self.assertEqual("MessageElement([b'456'])", repr(el))
2575             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2576         else:
2577             self.assertEqual("MessageElement(['456'])", repr(el))
2578             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2579
2580     def test_bad_text(self):
2581         el = ldb.MessageElement(b'\xba\xdd')
2582         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2583
2584
2585 class ModuleTests(TestCase):
2586
2587     def setUp(self):
2588         super(ModuleTests, self).setUp()
2589         self.testdir = tempdir()
2590         self.filename = os.path.join(self.testdir, "test.ldb")
2591         self.ldb = ldb.Ldb(self.filename)
2592
2593     def tearDown(self):
2594         shutil.rmtree(self.testdir)
2595         super(ModuleTests, self).setUp()
2596
2597     def test_register_module(self):
2598         class ExampleModule:
2599             name = "example"
2600         ldb.register_module(ExampleModule)
2601
2602     def test_use_module(self):
2603         ops = []
2604         class ExampleModule:
2605             name = "bla"
2606
2607             def __init__(self, ldb, next):
2608                 ops.append("init")
2609                 self.next = next
2610
2611             def search(self, *args, **kwargs):
2612                 return self.next.search(*args, **kwargs)
2613
2614             def request(self, *args, **kwargs):
2615                 pass
2616
2617         ldb.register_module(ExampleModule)
2618         l = ldb.Ldb(self.filename)
2619         l.add({"dn": "@MODULES", "@LIST": "bla"})
2620         self.assertEqual([], ops)
2621         l = ldb.Ldb(self.filename)
2622         self.assertEqual(["init"], ops)
2623
2624 class LdbResultTests(LdbBaseTest):
2625
2626     def setUp(self):
2627         super(LdbResultTests, self).setUp()
2628         self.testdir = tempdir()
2629         self.filename = os.path.join(self.testdir, "test.ldb")
2630         self.l = ldb.Ldb(self.url(), flags=self.flags())
2631         try:
2632             self.l.add(self.index)
2633         except AttributeError:
2634             pass
2635         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2636                     "objectUUID": b"0123456789abcde0"})
2637         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2638                     "objectUUID": b"0123456789abcde1"})
2639         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2640                     "objectUUID": b"0123456789abcde2"})
2641         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2642                     "objectUUID": b"0123456789abcde3"})
2643         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2644                     "objectUUID": b"0123456789abcde4"})
2645         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2646                     "objectUUID": b"0123456789abcde5"})
2647         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2648                     "objectUUID": b"0123456789abcde6"})
2649         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2650                     "objectUUID": b"0123456789abcde7"})
2651         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2652                     "objectUUID": b"0123456789abcde8"})
2653         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2654                     "objectUUID": b"0123456789abcde9"})
2655         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2656                     "objectUUID": b"0123456789abcdea"})
2657         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2658                     "objectUUID": b"0123456789abcdeb"})
2659         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2660                     "objectUUID": b"0123456789abcdec"})
2661
2662     def tearDown(self):
2663         shutil.rmtree(self.testdir)
2664         super(LdbResultTests, self).tearDown()
2665         # Ensure the LDB is closed now, so we close the FD
2666         del(self.l)
2667
2668     def test_return_type(self):
2669         res = self.l.search()
2670         self.assertEqual(str(res), "<ldb result>")
2671
2672     def test_get_msgs(self):
2673         res = self.l.search()
2674         list = res.msgs
2675
2676     def test_get_controls(self):
2677         res = self.l.search()
2678         list = res.controls
2679
2680     def test_get_referals(self):
2681         res = self.l.search()
2682         list = res.referals
2683
2684     def test_iter_msgs(self):
2685         found = False
2686         for l in self.l.search().msgs:
2687             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2688                 found = True
2689         self.assertTrue(found)
2690
2691     def test_iter_msgs_count(self):
2692         self.assertTrue(self.l.search().count > 0)
2693         # 13 objects has been added to the DC=SAMBA, DC=ORG
2694         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2695
2696     def test_iter_controls(self):
2697         res = self.l.search().controls
2698         it = iter(res)
2699
2700     def test_create_control(self):
2701         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2702         c = ldb.Control(self.l, "relax:1")
2703         self.assertEqual(c.critical, True)
2704         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2705
2706     def test_iter_refs(self):
2707         res = self.l.search().referals
2708         it = iter(res)
2709
2710     def test_search_sequence_msgs(self):
2711         found = False
2712         res = self.l.search().msgs
2713
2714         for i in range(0, len(res)):
2715             l = res[i]
2716             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2717                 found = True
2718         self.assertTrue(found)
2719
2720     def test_search_as_iter(self):
2721         found = False
2722         res = self.l.search()
2723
2724         for l in res:
2725             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2726                 found = True
2727         self.assertTrue(found)
2728
2729     def test_search_iter(self):
2730         found = False
2731         res = self.l.search_iterator()
2732
2733         for l in res:
2734             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2735                 found = True
2736         self.assertTrue(found)
2737
2738
2739     # Show that search results can't see into a transaction
2740
2741
2742     def test_search_against_trans(self):
2743         found11 = False
2744
2745         (r1, w1) = os.pipe()
2746
2747         (r2, w2) = os.pipe()
2748
2749         # For the first element, fork a child that will
2750         # write to the DB
2751         pid = os.fork()
2752         if pid == 0:
2753             # In the child, re-open
2754             del(self.l)
2755             gc.collect()
2756
2757             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2758             # start a transaction
2759             child_ldb.transaction_start()
2760
2761             # write to it
2762             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2763                            "name": b"samba.org",
2764                            "objectUUID": b"o123456789acbdef"})
2765
2766             os.write(w1, b"added")
2767
2768             # Now wait for the search to be done
2769             os.read(r2, 6)
2770
2771             # and commit
2772             try:
2773                 child_ldb.transaction_commit()
2774             except LdbError as err:
2775                 # We print this here to see what went wrong in the child
2776                 print(err)
2777                 os._exit(1)
2778
2779             os.write(w1, b"transaction")
2780             os._exit(0)
2781
2782         self.assertEqual(os.read(r1, 5), b"added")
2783
2784         # This should not turn up until the transaction is concluded
2785         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2786                               scope=ldb.SCOPE_BASE)
2787         self.assertEqual(len(res11), 0)
2788
2789         os.write(w2, b"search")
2790
2791         # Now wait for the transaction to be done.  This should
2792         # deadlock, but the search doesn't hold a read lock for the
2793         # iterator lifetime currently.
2794         self.assertEqual(os.read(r1, 11), b"transaction")
2795
2796         # This should now turn up, as the transaction is over
2797         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2798                               scope=ldb.SCOPE_BASE)
2799         self.assertEqual(len(res11), 1)
2800
2801         self.assertFalse(found11)
2802
2803         (got_pid, status) = os.waitpid(pid, 0)
2804         self.assertEqual(got_pid, pid)
2805
2806
2807     def test_search_iter_against_trans(self):
2808         found = False
2809         found11 = False
2810
2811         # We need to hold this iterator open to hold the all-record
2812         # lock
2813         res = self.l.search_iterator()
2814
2815         (r1, w1) = os.pipe()
2816
2817         (r2, w2) = os.pipe()
2818
2819         # For the first element, with the sequence open (which
2820         # means with ldb locks held), fork a child that will
2821         # write to the DB
2822         pid = os.fork()
2823         if pid == 0:
2824             # In the child, re-open
2825             del(res)
2826             del(self.l)
2827             gc.collect()
2828
2829             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2830             # start a transaction
2831             child_ldb.transaction_start()
2832
2833             # write to it
2834             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2835                            "name": b"samba.org",
2836                            "objectUUID": b"o123456789acbdef"})
2837
2838             os.write(w1, b"added")
2839
2840             # Now wait for the search to be done
2841             os.read(r2, 6)
2842
2843             # and commit
2844             try:
2845                 child_ldb.transaction_commit()
2846             except LdbError as err:
2847                 # We print this here to see what went wrong in the child
2848                 print(err)
2849                 os._exit(1)
2850
2851             os.write(w1, b"transaction")
2852             os._exit(0)
2853
2854         self.assertEqual(os.read(r1, 5), b"added")
2855
2856         # This should not turn up until the transaction is concluded
2857         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2858                               scope=ldb.SCOPE_BASE)
2859         self.assertEqual(len(res11), 0)
2860
2861         os.write(w2, b"search")
2862
2863         # allow the transaction to start
2864         time.sleep(1)
2865
2866         # This should not turn up until the search finishes and
2867         # removed the read lock, but for ldb_tdb that happened as soon
2868         # as we called the first res.next()
2869         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2870                               scope=ldb.SCOPE_BASE)
2871         self.assertEqual(len(res11), 0)
2872
2873         # These results are all collected at the first next(res) call
2874         for l in res:
2875             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2876                 found = True
2877             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2878                 found11 = True
2879
2880         # Now wait for the transaction to be done.
2881         self.assertEqual(os.read(r1, 11), b"transaction")
2882
2883         # This should now turn up, as the transaction is over and all
2884         # read locks are gone
2885         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2886                               scope=ldb.SCOPE_BASE)
2887         self.assertEqual(len(res11), 1)
2888
2889         self.assertTrue(found)
2890         self.assertFalse(found11)
2891
2892         (got_pid, status) = os.waitpid(pid, 0)
2893         self.assertEqual(got_pid, pid)
2894
2895
2896 class LdbResultTestsLmdb(LdbResultTests):
2897
2898     def setUp(self):
2899         self.prefix = MDB_PREFIX
2900         self.index = MDB_INDEX_OBJ
2901         super(LdbResultTestsLmdb, self).setUp()
2902
2903     def tearDown(self):
2904         super(LdbResultTestsLmdb, self).tearDown()
2905
2906
2907 class BadTypeTests(TestCase):
2908     def test_control(self):
2909         l = ldb.Ldb()
2910         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2911         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2912
2913     def test_modify(self):
2914         l = ldb.Ldb()
2915         dn = ldb.Dn(l, 'a=b')
2916         m = ldb.Message(dn)
2917         self.assertRaises(TypeError, l.modify, '<bad type>')
2918         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2919
2920     def test_add(self):
2921         l = ldb.Ldb()
2922         dn = ldb.Dn(l, 'a=b')
2923         m = ldb.Message(dn)
2924         self.assertRaises(TypeError, l.add, '<bad type>')
2925         self.assertRaises(TypeError, l.add, m, '<bad type>')
2926
2927     def test_delete(self):
2928         l = ldb.Ldb()
2929         dn = ldb.Dn(l, 'a=b')
2930         self.assertRaises(TypeError, l.add, '<bad type>')
2931         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2932
2933     def test_rename(self):
2934         l = ldb.Ldb()
2935         dn = ldb.Dn(l, 'a=b')
2936         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2937         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2938         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2939
2940     def test_search(self):
2941         l = ldb.Ldb()
2942         self.assertRaises(TypeError, l.search, base=1234)
2943         self.assertRaises(TypeError, l.search, scope='<bad type>')
2944         self.assertRaises(TypeError, l.search, expression=1234)
2945         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2946         self.assertRaises(TypeError, l.search, controls='<bad type>')
2947
2948
2949 class VersionTests(TestCase):
2950
2951     def test_version(self):
2952         self.assertTrue(isinstance(ldb.__version__, str))
2953
2954
2955 if __name__ == '__main__':
2956     import unittest
2957     unittest.TestProgram()