PEP8: fix E502: the backslash is redundant between brackets
[amitay/samba.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18 MDB_INDEX_OBJ = {
19     "dn": "@INDEXLIST",
20     "@IDXONE": [b"1"],
21     "@IDXGUID": [b"objectUUID"],
22     "@IDX_DN_GUID": [b"GUID"]
23 }
24
25
26 def tempdir():
27     import tempfile
28     try:
29         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
30     except KeyError:
31         dir_prefix = None
32     return tempfile.mkdtemp(dir=dir_prefix)
33
34
35 class NoContextTests(TestCase):
36
37     def test_valid_attr_name(self):
38         self.assertTrue(ldb.valid_attr_name("foo"))
39         self.assertFalse(ldb.valid_attr_name("24foo"))
40
41     def test_timestring(self):
42         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
43         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
44
45     def test_string_to_time(self):
46         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
47         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
48
49     def test_binary_encode(self):
50         encoded = ldb.binary_encode(b'test\\x')
51         decoded = ldb.binary_decode(encoded)
52         self.assertEqual(decoded, b'test\\x')
53
54         encoded2 = ldb.binary_encode('test\\x')
55         self.assertEqual(encoded2, encoded)
56
57
58 class LdbBaseTest(TestCase):
59     def setUp(self):
60         super(LdbBaseTest, self).setUp()
61         try:
62             if self.prefix is None:
63                 self.prefix = TDB_PREFIX
64         except AttributeError:
65             self.prefix = TDB_PREFIX
66
67     def tearDown(self):
68         super(LdbBaseTest, self).tearDown()
69
70     def url(self):
71         return self.prefix + self.filename
72
73     def flags(self):
74         if self.prefix == MDB_PREFIX:
75             return ldb.FLG_NOSYNC
76         else:
77             return 0
78
79
80 class SimpleLdb(LdbBaseTest):
81
82     def setUp(self):
83         super(SimpleLdb, self).setUp()
84         self.testdir = tempdir()
85         self.filename = os.path.join(self.testdir, "test.ldb")
86         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
87         try:
88             self.ldb.add(self.index)
89         except AttributeError:
90             pass
91
92     def tearDown(self):
93         shutil.rmtree(self.testdir)
94         super(SimpleLdb, self).tearDown()
95         # Ensure the LDB is closed now, so we close the FD
96         del(self.ldb)
97
98     def test_connect(self):
99         ldb.Ldb(self.url(), flags=self.flags())
100
101     def test_connect_none(self):
102         ldb.Ldb()
103
104     def test_connect_later(self):
105         x = ldb.Ldb()
106         x.connect(self.url(), flags=self.flags())
107
108     def test_repr(self):
109         x = ldb.Ldb()
110         self.assertTrue(repr(x).startswith("<ldb connection"))
111
112     def test_set_create_perms(self):
113         x = ldb.Ldb()
114         x.set_create_perms(0o600)
115
116     def test_modules_none(self):
117         x = ldb.Ldb()
118         self.assertEqual([], x.modules())
119
120     def test_modules_tdb(self):
121         x = ldb.Ldb(self.url(), flags=self.flags())
122         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
123
124     def test_firstmodule_none(self):
125         x = ldb.Ldb()
126         self.assertEqual(x.firstmodule, None)
127
128     def test_firstmodule_tdb(self):
129         x = ldb.Ldb(self.url(), flags=self.flags())
130         mod = x.firstmodule
131         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
132
133     def test_search(self):
134         l = ldb.Ldb(self.url(), flags=self.flags())
135         self.assertEqual(len(l.search()), 0)
136
137     def test_search_controls(self):
138         l = ldb.Ldb(self.url(), flags=self.flags())
139         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
140
141     def test_search_attrs(self):
142         l = ldb.Ldb(self.url(), flags=self.flags())
143         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
144
145     def test_search_string_dn(self):
146         l = ldb.Ldb(self.url(), flags=self.flags())
147         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
148
149     def test_search_attr_string(self):
150         l = ldb.Ldb(self.url(), flags=self.flags())
151         self.assertRaises(TypeError, l.search, attrs="dc")
152         self.assertRaises(TypeError, l.search, attrs=b"dc")
153
154     def test_opaque(self):
155         l = ldb.Ldb(self.url(), flags=self.flags())
156         l.set_opaque("my_opaque", l)
157         self.assertTrue(l.get_opaque("my_opaque") is not None)
158         self.assertEqual(None, l.get_opaque("unknown"))
159
160     def test_search_scope_base_empty_db(self):
161         l = ldb.Ldb(self.url(), flags=self.flags())
162         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
163                                       ldb.SCOPE_BASE)), 0)
164
165     def test_search_scope_onelevel_empty_db(self):
166         l = ldb.Ldb(self.url(), flags=self.flags())
167         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
168                                       ldb.SCOPE_ONELEVEL)), 0)
169
170     def test_delete(self):
171         l = ldb.Ldb(self.url(), flags=self.flags())
172         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
173
174     def test_delete_w_unhandled_ctrl(self):
175         l = ldb.Ldb(self.url(), flags=self.flags())
176         m = ldb.Message()
177         m.dn = ldb.Dn(l, "dc=foo1")
178         m["b"] = [b"a"]
179         m["objectUUID"] = b"0123456789abcdef"
180         l.add(m)
181         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
182         l.delete(m.dn)
183
184     def test_contains(self):
185         name = self.url()
186         l = ldb.Ldb(name, flags=self.flags())
187         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
188         l = ldb.Ldb(name, flags=self.flags())
189         m = ldb.Message()
190         m.dn = ldb.Dn(l, "dc=foo3")
191         m["b"] = ["a"]
192         m["objectUUID"] = b"0123456789abcdef"
193         l.add(m)
194         try:
195             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
196             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
197         finally:
198             l.delete(m.dn)
199
200     def test_get_config_basedn(self):
201         l = ldb.Ldb(self.url(), flags=self.flags())
202         self.assertEqual(None, l.get_config_basedn())
203
204     def test_get_root_basedn(self):
205         l = ldb.Ldb(self.url(), flags=self.flags())
206         self.assertEqual(None, l.get_root_basedn())
207
208     def test_get_schema_basedn(self):
209         l = ldb.Ldb(self.url(), flags=self.flags())
210         self.assertEqual(None, l.get_schema_basedn())
211
212     def test_get_default_basedn(self):
213         l = ldb.Ldb(self.url(), flags=self.flags())
214         self.assertEqual(None, l.get_default_basedn())
215
216     def test_add(self):
217         l = ldb.Ldb(self.url(), flags=self.flags())
218         m = ldb.Message()
219         m.dn = ldb.Dn(l, "dc=foo4")
220         m["bla"] = b"bla"
221         m["objectUUID"] = b"0123456789abcdef"
222         self.assertEqual(len(l.search()), 0)
223         l.add(m)
224         try:
225             self.assertEqual(len(l.search()), 1)
226         finally:
227             l.delete(ldb.Dn(l, "dc=foo4"))
228
229     def test_search_iterator(self):
230         l = ldb.Ldb(self.url(), flags=self.flags())
231         s = l.search_iterator()
232         s.abandon()
233         try:
234             for me in s:
235                 self.fail()
236             self.fail()
237         except RuntimeError as re:
238             pass
239         try:
240             s.abandon()
241             self.fail()
242         except RuntimeError as re:
243             pass
244         try:
245             s.result()
246             self.fail()
247         except RuntimeError as re:
248             pass
249
250         s = l.search_iterator()
251         count = 0
252         for me in s:
253             self.assertTrue(isinstance(me, ldb.Message))
254             count += 1
255         r = s.result()
256         self.assertEqual(len(r), 0)
257         self.assertEqual(count, 0)
258
259         m1 = ldb.Message()
260         m1.dn = ldb.Dn(l, "dc=foo4")
261         m1["bla"] = b"bla"
262         m1["objectUUID"] = b"0123456789abcdef"
263         l.add(m1)
264         try:
265             s = l.search_iterator()
266             msgs = []
267             for me in s:
268                 self.assertTrue(isinstance(me, ldb.Message))
269                 count += 1
270                 msgs.append(me)
271             r = s.result()
272             self.assertEqual(len(r), 0)
273             self.assertEqual(len(msgs), 1)
274             self.assertEqual(msgs[0].dn, m1.dn)
275
276             m2 = ldb.Message()
277             m2.dn = ldb.Dn(l, "dc=foo5")
278             m2["bla"] = b"bla"
279             m2["objectUUID"] = b"0123456789abcdee"
280             l.add(m2)
281
282             s = l.search_iterator()
283             msgs = []
284             for me in s:
285                 self.assertTrue(isinstance(me, ldb.Message))
286                 count += 1
287                 msgs.append(me)
288             r = s.result()
289             self.assertEqual(len(r), 0)
290             self.assertEqual(len(msgs), 2)
291             if msgs[0].dn == m1.dn:
292                 self.assertEqual(msgs[0].dn, m1.dn)
293                 self.assertEqual(msgs[1].dn, m2.dn)
294             else:
295                 self.assertEqual(msgs[0].dn, m2.dn)
296                 self.assertEqual(msgs[1].dn, m1.dn)
297
298             s = l.search_iterator()
299             msgs = []
300             for me in s:
301                 self.assertTrue(isinstance(me, ldb.Message))
302                 count += 1
303                 msgs.append(me)
304                 break
305             try:
306                 s.result()
307                 self.fail()
308             except RuntimeError as re:
309                 pass
310             for me in s:
311                 self.assertTrue(isinstance(me, ldb.Message))
312                 count += 1
313                 msgs.append(me)
314                 break
315             for me in s:
316                 self.fail()
317
318             r = s.result()
319             self.assertEqual(len(r), 0)
320             self.assertEqual(len(msgs), 2)
321             if msgs[0].dn == m1.dn:
322                 self.assertEqual(msgs[0].dn, m1.dn)
323                 self.assertEqual(msgs[1].dn, m2.dn)
324             else:
325                 self.assertEqual(msgs[0].dn, m2.dn)
326                 self.assertEqual(msgs[1].dn, m1.dn)
327         finally:
328             l.delete(ldb.Dn(l, "dc=foo4"))
329             l.delete(ldb.Dn(l, "dc=foo5"))
330
331     def test_add_text(self):
332         l = ldb.Ldb(self.url(), flags=self.flags())
333         m = ldb.Message()
334         m.dn = ldb.Dn(l, "dc=foo4")
335         m["bla"] = "bla"
336         m["objectUUID"] = b"0123456789abcdef"
337         self.assertEqual(len(l.search()), 0)
338         l.add(m)
339         try:
340             self.assertEqual(len(l.search()), 1)
341         finally:
342             l.delete(ldb.Dn(l, "dc=foo4"))
343
344     def test_add_w_unhandled_ctrl(self):
345         l = ldb.Ldb(self.url(), flags=self.flags())
346         m = ldb.Message()
347         m.dn = ldb.Dn(l, "dc=foo4")
348         m["bla"] = b"bla"
349         self.assertEqual(len(l.search()), 0)
350         self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
351
352     def test_add_dict(self):
353         l = ldb.Ldb(self.url(), flags=self.flags())
354         m = {"dn": ldb.Dn(l, "dc=foo5"),
355              "bla": b"bla",
356              "objectUUID": b"0123456789abcdef"}
357         self.assertEqual(len(l.search()), 0)
358         l.add(m)
359         try:
360             self.assertEqual(len(l.search()), 1)
361         finally:
362             l.delete(ldb.Dn(l, "dc=foo5"))
363
364     def test_add_dict_text(self):
365         l = ldb.Ldb(self.url(), flags=self.flags())
366         m = {"dn": ldb.Dn(l, "dc=foo5"),
367              "bla": "bla",
368              "objectUUID": b"0123456789abcdef"}
369         self.assertEqual(len(l.search()), 0)
370         l.add(m)
371         try:
372             self.assertEqual(len(l.search()), 1)
373         finally:
374             l.delete(ldb.Dn(l, "dc=foo5"))
375
376     def test_add_dict_string_dn(self):
377         l = ldb.Ldb(self.url(), flags=self.flags())
378         m = {"dn": "dc=foo6", "bla": b"bla",
379              "objectUUID": b"0123456789abcdef"}
380         self.assertEqual(len(l.search()), 0)
381         l.add(m)
382         try:
383             self.assertEqual(len(l.search()), 1)
384         finally:
385             l.delete(ldb.Dn(l, "dc=foo6"))
386
387     def test_add_dict_bytes_dn(self):
388         l = ldb.Ldb(self.url(), flags=self.flags())
389         m = {"dn": b"dc=foo6", "bla": b"bla",
390              "objectUUID": b"0123456789abcdef"}
391         self.assertEqual(len(l.search()), 0)
392         l.add(m)
393         try:
394             self.assertEqual(len(l.search()), 1)
395         finally:
396             l.delete(ldb.Dn(l, "dc=foo6"))
397
398     def test_rename(self):
399         l = ldb.Ldb(self.url(), flags=self.flags())
400         m = ldb.Message()
401         m.dn = ldb.Dn(l, "dc=foo7")
402         m["bla"] = b"bla"
403         m["objectUUID"] = b"0123456789abcdef"
404         self.assertEqual(len(l.search()), 0)
405         l.add(m)
406         try:
407             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
408             self.assertEqual(len(l.search()), 1)
409         finally:
410             l.delete(ldb.Dn(l, "dc=bar"))
411
412     def test_rename_string_dns(self):
413         l = ldb.Ldb(self.url(), flags=self.flags())
414         m = ldb.Message()
415         m.dn = ldb.Dn(l, "dc=foo8")
416         m["bla"] = b"bla"
417         m["objectUUID"] = b"0123456789abcdef"
418         self.assertEqual(len(l.search()), 0)
419         l.add(m)
420         self.assertEqual(len(l.search()), 1)
421         try:
422             l.rename("dc=foo8", "dc=bar")
423             self.assertEqual(len(l.search()), 1)
424         finally:
425             l.delete(ldb.Dn(l, "dc=bar"))
426
427     def test_rename_bad_string_dns(self):
428         l = ldb.Ldb(self.url(), flags=self.flags())
429         m = ldb.Message()
430         m.dn = ldb.Dn(l, "dc=foo8")
431         m["bla"] = b"bla"
432         m["objectUUID"] = b"0123456789abcdef"
433         self.assertEqual(len(l.search()), 0)
434         l.add(m)
435         self.assertEqual(len(l.search()), 1)
436         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
437         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
438         l.delete(ldb.Dn(l, "dc=foo8"))
439
440     def test_empty_dn(self):
441         l = ldb.Ldb(self.url(), flags=self.flags())
442         self.assertEqual(0, len(l.search()))
443         m = ldb.Message()
444         m.dn = ldb.Dn(l, "dc=empty")
445         m["objectUUID"] = b"0123456789abcdef"
446         l.add(m)
447         rm = l.search()
448         self.assertEqual(1, len(rm))
449         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
450                          set(rm[0].keys()))
451
452         rm = l.search(m.dn)
453         self.assertEqual(1, len(rm))
454         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
455                          set(rm[0].keys()))
456         rm = l.search(m.dn, attrs=["blah"])
457         self.assertEqual(1, len(rm))
458         self.assertEqual(0, len(rm[0]))
459
460     def test_modify_delete(self):
461         l = ldb.Ldb(self.url(), flags=self.flags())
462         m = ldb.Message()
463         m.dn = ldb.Dn(l, "dc=modifydelete")
464         m["bla"] = [b"1234"]
465         m["objectUUID"] = b"0123456789abcdef"
466         l.add(m)
467         rm = l.search(m.dn)[0]
468         self.assertEqual([b"1234"], list(rm["bla"]))
469         try:
470             m = ldb.Message()
471             m.dn = ldb.Dn(l, "dc=modifydelete")
472             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
473             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
474             l.modify(m)
475             rm = l.search(m.dn)
476             self.assertEqual(1, len(rm))
477             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
478                              set(rm[0].keys()))
479             rm = l.search(m.dn, attrs=["bla"])
480             self.assertEqual(1, len(rm))
481             self.assertEqual(0, len(rm[0]))
482         finally:
483             l.delete(ldb.Dn(l, "dc=modifydelete"))
484
485     def test_modify_delete_text(self):
486         l = ldb.Ldb(self.url(), flags=self.flags())
487         m = ldb.Message()
488         m.dn = ldb.Dn(l, "dc=modifydelete")
489         m.text["bla"] = ["1234"]
490         m["objectUUID"] = b"0123456789abcdef"
491         l.add(m)
492         rm = l.search(m.dn)[0]
493         self.assertEqual(["1234"], list(rm.text["bla"]))
494         try:
495             m = ldb.Message()
496             m.dn = ldb.Dn(l, "dc=modifydelete")
497             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
498             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
499             l.modify(m)
500             rm = l.search(m.dn)
501             self.assertEqual(1, len(rm))
502             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
503                              set(rm[0].keys()))
504             rm = l.search(m.dn, attrs=["bla"])
505             self.assertEqual(1, len(rm))
506             self.assertEqual(0, len(rm[0]))
507         finally:
508             l.delete(ldb.Dn(l, "dc=modifydelete"))
509
510     def test_modify_add(self):
511         l = ldb.Ldb(self.url(), flags=self.flags())
512         m = ldb.Message()
513         m.dn = ldb.Dn(l, "dc=add")
514         m["bla"] = [b"1234"]
515         m["objectUUID"] = b"0123456789abcdef"
516         l.add(m)
517         try:
518             m = ldb.Message()
519             m.dn = ldb.Dn(l, "dc=add")
520             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
521             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
522             l.modify(m)
523             rm = l.search(m.dn)[0]
524             self.assertEqual(3, len(rm))
525             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
526         finally:
527             l.delete(ldb.Dn(l, "dc=add"))
528
529     def test_modify_add_text(self):
530         l = ldb.Ldb(self.url(), flags=self.flags())
531         m = ldb.Message()
532         m.dn = ldb.Dn(l, "dc=add")
533         m.text["bla"] = ["1234"]
534         m["objectUUID"] = b"0123456789abcdef"
535         l.add(m)
536         try:
537             m = ldb.Message()
538             m.dn = ldb.Dn(l, "dc=add")
539             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
540             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
541             l.modify(m)
542             rm = l.search(m.dn)[0]
543             self.assertEqual(3, len(rm))
544             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
545         finally:
546             l.delete(ldb.Dn(l, "dc=add"))
547
548     def test_modify_replace(self):
549         l = ldb.Ldb(self.url(), flags=self.flags())
550         m = ldb.Message()
551         m.dn = ldb.Dn(l, "dc=modify2")
552         m["bla"] = [b"1234", b"456"]
553         m["objectUUID"] = b"0123456789abcdef"
554         l.add(m)
555         try:
556             m = ldb.Message()
557             m.dn = ldb.Dn(l, "dc=modify2")
558             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
559             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
560             l.modify(m)
561             rm = l.search(m.dn)[0]
562             self.assertEqual(3, len(rm))
563             self.assertEqual([b"789"], list(rm["bla"]))
564             rm = l.search(m.dn, attrs=["bla"])[0]
565             self.assertEqual(1, len(rm))
566         finally:
567             l.delete(ldb.Dn(l, "dc=modify2"))
568
569     def test_modify_replace_text(self):
570         l = ldb.Ldb(self.url(), flags=self.flags())
571         m = ldb.Message()
572         m.dn = ldb.Dn(l, "dc=modify2")
573         m.text["bla"] = ["1234", "456"]
574         m["objectUUID"] = b"0123456789abcdef"
575         l.add(m)
576         try:
577             m = ldb.Message()
578             m.dn = ldb.Dn(l, "dc=modify2")
579             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
580             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
581             l.modify(m)
582             rm = l.search(m.dn)[0]
583             self.assertEqual(3, len(rm))
584             self.assertEqual(["789"], list(rm.text["bla"]))
585             rm = l.search(m.dn, attrs=["bla"])[0]
586             self.assertEqual(1, len(rm))
587         finally:
588             l.delete(ldb.Dn(l, "dc=modify2"))
589
590     def test_modify_flags_change(self):
591         l = ldb.Ldb(self.url(), flags=self.flags())
592         m = ldb.Message()
593         m.dn = ldb.Dn(l, "dc=add")
594         m["bla"] = [b"1234"]
595         m["objectUUID"] = b"0123456789abcdef"
596         l.add(m)
597         try:
598             m = ldb.Message()
599             m.dn = ldb.Dn(l, "dc=add")
600             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
601             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
602             l.modify(m)
603             rm = l.search(m.dn)[0]
604             self.assertEqual(3, len(rm))
605             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
606
607             # Now create another modify, but switch the flags before we do it
608             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
609             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
610             l.modify(m)
611             rm = l.search(m.dn, attrs=["bla"])[0]
612             self.assertEqual(1, len(rm))
613             self.assertEqual([b"1234"], list(rm["bla"]))
614         finally:
615             l.delete(ldb.Dn(l, "dc=add"))
616
617     def test_modify_flags_change_text(self):
618         l = ldb.Ldb(self.url(), flags=self.flags())
619         m = ldb.Message()
620         m.dn = ldb.Dn(l, "dc=add")
621         m.text["bla"] = ["1234"]
622         m["objectUUID"] = b"0123456789abcdef"
623         l.add(m)
624         try:
625             m = ldb.Message()
626             m.dn = ldb.Dn(l, "dc=add")
627             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
628             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
629             l.modify(m)
630             rm = l.search(m.dn)[0]
631             self.assertEqual(3, len(rm))
632             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
633
634             # Now create another modify, but switch the flags before we do it
635             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
636             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
637             l.modify(m)
638             rm = l.search(m.dn, attrs=["bla"])[0]
639             self.assertEqual(1, len(rm))
640             self.assertEqual(["1234"], list(rm.text["bla"]))
641         finally:
642             l.delete(ldb.Dn(l, "dc=add"))
643
644     def test_transaction_commit(self):
645         l = ldb.Ldb(self.url(), flags=self.flags())
646         l.transaction_start()
647         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
648         m["foo"] = [b"bar"]
649         m["objectUUID"] = b"0123456789abcdef"
650         l.add(m)
651         l.transaction_commit()
652         l.delete(m.dn)
653
654     def test_transaction_cancel(self):
655         l = ldb.Ldb(self.url(), flags=self.flags())
656         l.transaction_start()
657         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
658         m["foo"] = [b"bar"]
659         m["objectUUID"] = b"0123456789abcdee"
660         l.add(m)
661         l.transaction_cancel()
662         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
663
664     def test_set_debug(self):
665         def my_report_fn(level, text):
666             pass
667         l = ldb.Ldb(self.url(), flags=self.flags())
668         l.set_debug(my_report_fn)
669
670     def test_zero_byte_string(self):
671         """Testing we do not get trapped in the \0 byte in a property string."""
672         l = ldb.Ldb(self.url(), flags=self.flags())
673         l.add({
674             "dn": b"dc=somedn",
675             "objectclass": b"user",
676             "cN": b"LDAPtestUSER",
677             "givenname": b"ldap",
678             "displayname": b"foo\0bar",
679             "objectUUID": b"0123456789abcdef"
680         })
681         res = l.search(expression="(dn=dc=somedn)")
682         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
683
684     def test_no_crash_broken_expr(self):
685         l = ldb.Ldb(self.url(), flags=self.flags())
686         self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
687
688 # Run the SimpleLdb tests against an lmdb backend
689
690
691 class SimpleLdbLmdb(SimpleLdb):
692
693     def setUp(self):
694         self.prefix = MDB_PREFIX
695         self.index = MDB_INDEX_OBJ
696         super(SimpleLdbLmdb, self).setUp()
697
698     def tearDown(self):
699         super(SimpleLdbLmdb, self).tearDown()
700
701
702 class SearchTests(LdbBaseTest):
703     def tearDown(self):
704         shutil.rmtree(self.testdir)
705         super(SearchTests, self).tearDown()
706
707         # Ensure the LDB is closed now, so we close the FD
708         del(self.l)
709
710     def setUp(self):
711         super(SearchTests, self).setUp()
712         self.testdir = tempdir()
713         self.filename = os.path.join(self.testdir, "search_test.ldb")
714         options = ["modules:rdn_name"]
715         if hasattr(self, 'IDXCHECK'):
716             options.append("disable_full_db_scan_for_self_test:1")
717         self.l = ldb.Ldb(self.url(),
718                          flags=self.flags(),
719                          options=options)
720         try:
721             self.l.add(self.index)
722         except AttributeError:
723             pass
724
725         self.l.add({"dn": "@ATTRIBUTES",
726                     "DC": "CASE_INSENSITIVE"})
727
728         # Note that we can't use the name objectGUID here, as we
729         # want to stay clear of the objectGUID handler in LDB and
730         # instead use just the 16 bytes raw, which we just keep
731         # to printable chars here for ease of handling.
732
733         self.l.add({"dn": "DC=SAMBA,DC=ORG",
734                     "name": b"samba.org",
735                     "objectUUID": b"0123456789abcdef"})
736         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
737                     "name": b"Admins",
738                     "x": "z", "y": "a",
739                     "objectUUID": b"0123456789abcde1"})
740         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
741                     "name": b"Users",
742                     "x": "z", "y": "a",
743                     "objectUUID": b"0123456789abcde2"})
744         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
745                     "name": b"OU #1",
746                     "x": "y", "y": "a",
747                     "objectUUID": b"0123456789abcde3"})
748         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
749                     "name": b"OU #2",
750                     "x": "y", "y": "a",
751                     "objectUUID": b"0123456789abcde4"})
752         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
753                     "name": b"OU #3",
754                     "x": "y", "y": "a",
755                     "objectUUID": b"0123456789abcde5"})
756         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
757                     "name": b"OU #4",
758                     "x": "y", "y": "a",
759                     "objectUUID": b"0123456789abcde6"})
760         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
761                     "name": b"OU #5",
762                     "x": "y", "y": "a",
763                     "objectUUID": b"0123456789abcde7"})
764         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
765                     "name": b"OU #6",
766                     "x": "y", "y": "a",
767                     "objectUUID": b"0123456789abcde8"})
768         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
769                     "name": b"OU #7",
770                     "x": "y", "y": "a",
771                     "objectUUID": b"0123456789abcde9"})
772         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
773                     "name": b"OU #8",
774                     "x": "y", "y": "a",
775                     "objectUUID": b"0123456789abcde0"})
776         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
777                     "name": b"OU #9",
778                     "x": "y", "y": "a",
779                     "objectUUID": b"0123456789abcdea"})
780         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
781                     "name": b"OU #10",
782                     "x": "y", "y": "a",
783                     "objectUUID": b"0123456789abcdeb"})
784         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
785                     "name": b"OU #10",
786                     "x": "y", "y": "a",
787                     "objectUUID": b"0123456789abcdec"})
788         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
789                     "name": b"OU #10",
790                     "x": "y", "y": "b",
791                     "objectUUID": b"0123456789abcded"})
792         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
793                     "name": b"OU #10",
794                     "x": "x", "y": "b",
795                     "objectUUID": b"0123456789abcdee"})
796         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
797                     "name": b"OU #10",
798                     "x": "x", "y": "b",
799                     "objectUUID": b"0123456789abcd01"})
800         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
801                     "name": b"OU #10",
802                     "x": "x", "y": "b",
803                     "objectUUID": b"0123456789abcd02"})
804         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
805                     "name": b"OU #10",
806                     "x": "x", "y": "b",
807                     "objectUUID": b"0123456789abcd03"})
808         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
809                     "name": b"OU #10",
810                     "x": "x", "y": "b",
811                     "objectUUID": b"0123456789abcd04"})
812         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
813                     "name": b"OU #10",
814                     "x": "x", "y": "b",
815                     "objectUUID": b"0123456789abcd05"})
816         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
817                     "name": b"OU #10",
818                     "x": "x", "y": "b",
819                     "objectUUID": b"0123456789abcd06"})
820         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
821                     "name": b"OU #10",
822                     "x": "x", "y": "b",
823                     "objectUUID": b"0123456789abcd07"})
824         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
825                     "name": b"OU #10",
826                     "x": "x", "y": "c",
827                     "objectUUID": b"0123456789abcd08"})
828         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
829                     "name": b"OU #10",
830                     "x": "x", "y": "c",
831                     "objectUUID": b"0123456789abcd09"})
832
833     def test_base(self):
834         """Testing a search"""
835
836         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
837                               scope=ldb.SCOPE_BASE)
838         self.assertEqual(len(res11), 1)
839
840     def test_base_lower(self):
841         """Testing a search"""
842
843         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
844                               scope=ldb.SCOPE_BASE)
845         self.assertEqual(len(res11), 1)
846
847     def test_base_or(self):
848         """Testing a search"""
849
850         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
851                               scope=ldb.SCOPE_BASE,
852                               expression="(|(ou=ou11)(ou=ou12))")
853         self.assertEqual(len(res11), 1)
854
855     def test_base_or2(self):
856         """Testing a search"""
857
858         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
859                               scope=ldb.SCOPE_BASE,
860                               expression="(|(x=y)(y=b))")
861         self.assertEqual(len(res11), 1)
862
863     def test_base_and(self):
864         """Testing a search"""
865
866         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
867                               scope=ldb.SCOPE_BASE,
868                               expression="(&(ou=ou11)(ou=ou12))")
869         self.assertEqual(len(res11), 0)
870
871     def test_base_and2(self):
872         """Testing a search"""
873
874         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
875                               scope=ldb.SCOPE_BASE,
876                               expression="(&(x=y)(y=a))")
877         self.assertEqual(len(res11), 1)
878
879     def test_base_false(self):
880         """Testing a search"""
881
882         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
883                               scope=ldb.SCOPE_BASE,
884                               expression="(|(ou=ou13)(ou=ou12))")
885         self.assertEqual(len(res11), 0)
886
887     def test_check_base_false(self):
888         """Testing a search"""
889         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
890                               scope=ldb.SCOPE_BASE,
891                               expression="(|(ou=ou13)(ou=ou12))")
892         self.assertEqual(len(res11), 0)
893
894     def test_check_base_error(self):
895         """Testing a search"""
896         checkbaseonsearch = {"dn": "@OPTIONS",
897                              "checkBaseOnSearch": b"TRUE"}
898         try:
899             self.l.add(checkbaseonsearch)
900         except ldb.LdbError as err:
901             enum = err.args[0]
902             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
903             m = ldb.Message.from_dict(self.l,
904                                       checkbaseonsearch)
905             self.l.modify(m)
906
907         try:
908             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
909                                   scope=ldb.SCOPE_BASE,
910                                   expression="(|(ou=ou13)(ou=ou12))")
911             self.fail("Should have failed on missing base")
912         except ldb.LdbError as err:
913             enum = err.args[0]
914             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
915
916     def test_subtree_and(self):
917         """Testing a search"""
918
919         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
920                               scope=ldb.SCOPE_SUBTREE,
921                               expression="(&(ou=ou11)(ou=ou12))")
922         self.assertEqual(len(res11), 0)
923
924     def test_subtree_and2(self):
925         """Testing a search"""
926
927         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
928                               scope=ldb.SCOPE_SUBTREE,
929                               expression="(&(x=y)(|(y=b)(y=c)))")
930         self.assertEqual(len(res11), 1)
931
932     def test_subtree_and2_lower(self):
933         """Testing a search"""
934
935         res11 = self.l.search(base="DC=samba,DC=org",
936                               scope=ldb.SCOPE_SUBTREE,
937                               expression="(&(x=y)(|(y=b)(y=c)))")
938         self.assertEqual(len(res11), 1)
939
940     def test_subtree_or(self):
941         """Testing a search"""
942
943         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
944                               scope=ldb.SCOPE_SUBTREE,
945                               expression="(|(ou=ou11)(ou=ou12))")
946         self.assertEqual(len(res11), 2)
947
948     def test_subtree_or2(self):
949         """Testing a search"""
950
951         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
952                               scope=ldb.SCOPE_SUBTREE,
953                               expression="(|(x=y)(y=b))")
954         self.assertEqual(len(res11), 20)
955
956     def test_subtree_or3(self):
957         """Testing a search"""
958
959         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
960                               scope=ldb.SCOPE_SUBTREE,
961                               expression="(|(x=y)(y=b)(y=c))")
962         self.assertEqual(len(res11), 22)
963
964     def test_one_and(self):
965         """Testing a search"""
966
967         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
968                               scope=ldb.SCOPE_ONELEVEL,
969                               expression="(&(ou=ou11)(ou=ou12))")
970         self.assertEqual(len(res11), 0)
971
972     def test_one_and2(self):
973         """Testing a search"""
974
975         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
976                               scope=ldb.SCOPE_ONELEVEL,
977                               expression="(&(x=y)(y=b))")
978         self.assertEqual(len(res11), 1)
979
980     def test_one_or(self):
981         """Testing a search"""
982
983         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
984                               scope=ldb.SCOPE_ONELEVEL,
985                               expression="(|(ou=ou11)(ou=ou12))")
986         self.assertEqual(len(res11), 2)
987
988     def test_one_or2(self):
989         """Testing a search"""
990
991         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
992                               scope=ldb.SCOPE_ONELEVEL,
993                               expression="(|(x=y)(y=b))")
994         self.assertEqual(len(res11), 20)
995
996     def test_one_or2_lower(self):
997         """Testing a search"""
998
999         res11 = self.l.search(base="DC=samba,DC=org",
1000                               scope=ldb.SCOPE_ONELEVEL,
1001                               expression="(|(x=y)(y=b))")
1002         self.assertEqual(len(res11), 20)
1003
1004     def test_one_unindexable(self):
1005         """Testing a search"""
1006
1007         try:
1008             res11 = self.l.search(base="DC=samba,DC=org",
1009                                   scope=ldb.SCOPE_ONELEVEL,
1010                                   expression="(y=b*)")
1011             if hasattr(self, 'IDX') and \
1012                not hasattr(self, 'IDXONE') and \
1013                hasattr(self, 'IDXCHECK'):
1014                 self.fail("Should have failed as un-indexed search")
1015
1016             self.assertEqual(len(res11), 9)
1017
1018         except ldb.LdbError as err:
1019             enum = err.args[0]
1020             estr = err.args[1]
1021             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1022             self.assertIn(estr, "ldb FULL SEARCH disabled")
1023
1024     def test_one_unindexable_presence(self):
1025         """Testing a search"""
1026
1027         try:
1028             res11 = self.l.search(base="DC=samba,DC=org",
1029                                   scope=ldb.SCOPE_ONELEVEL,
1030                                   expression="(y=*)")
1031             if hasattr(self, 'IDX') and \
1032                not hasattr(self, 'IDXONE') and \
1033                hasattr(self, 'IDXCHECK'):
1034                 self.fail("Should have failed as un-indexed search")
1035
1036             self.assertEqual(len(res11), 24)
1037
1038         except ldb.LdbError as err:
1039             enum = err.args[0]
1040             estr = err.args[1]
1041             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1042             self.assertIn(estr, "ldb FULL SEARCH disabled")
1043
1044     def test_subtree_and_or(self):
1045         """Testing a search"""
1046
1047         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1048                               scope=ldb.SCOPE_SUBTREE,
1049                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1050         self.assertEqual(len(res11), 0)
1051
1052     def test_subtree_and_or2(self):
1053         """Testing a search"""
1054
1055         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1056                               scope=ldb.SCOPE_SUBTREE,
1057                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1058         self.assertEqual(len(res11), 0)
1059
1060     def test_subtree_and_or3(self):
1061         """Testing a search"""
1062
1063         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1064                               scope=ldb.SCOPE_SUBTREE,
1065                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1066         self.assertEqual(len(res11), 2)
1067
1068     def test_subtree_and_or4(self):
1069         """Testing a search"""
1070
1071         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1072                               scope=ldb.SCOPE_SUBTREE,
1073                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1074         self.assertEqual(len(res11), 2)
1075
1076     def test_subtree_and_or5(self):
1077         """Testing a search"""
1078
1079         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1080                               scope=ldb.SCOPE_SUBTREE,
1081                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1082         self.assertEqual(len(res11), 1)
1083
1084     def test_subtree_or_and(self):
1085         """Testing a search"""
1086
1087         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1088                               scope=ldb.SCOPE_SUBTREE,
1089                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1090         self.assertEqual(len(res11), 10)
1091
1092     def test_subtree_large_and_unique(self):
1093         """Testing a search"""
1094
1095         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1096                               scope=ldb.SCOPE_SUBTREE,
1097                               expression="(&(ou=ou10)(y=a))")
1098         self.assertEqual(len(res11), 1)
1099
1100     def test_subtree_and_none(self):
1101         """Testing a search"""
1102
1103         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1104                               scope=ldb.SCOPE_SUBTREE,
1105                               expression="(&(ou=ouX)(y=a))")
1106         self.assertEqual(len(res11), 0)
1107
1108     def test_subtree_and_idx_record(self):
1109         """Testing a search against the index record"""
1110
1111         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1112                               scope=ldb.SCOPE_SUBTREE,
1113                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1114         self.assertEqual(len(res11), 0)
1115
1116     def test_subtree_and_idxone_record(self):
1117         """Testing a search against the index record"""
1118
1119         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1120                               scope=ldb.SCOPE_SUBTREE,
1121                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1122         self.assertEqual(len(res11), 0)
1123
1124     def test_subtree_unindexable(self):
1125         """Testing a search"""
1126
1127         try:
1128             res11 = self.l.search(base="DC=samba,DC=org",
1129                                   scope=ldb.SCOPE_SUBTREE,
1130                                   expression="(y=b*)")
1131             if hasattr(self, 'IDX') and \
1132                hasattr(self, 'IDXCHECK'):
1133                 self.fail("Should have failed as un-indexed search")
1134
1135             self.assertEqual(len(res11), 9)
1136
1137         except ldb.LdbError as err:
1138             enum = err.args[0]
1139             estr = err.args[1]
1140             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1141             self.assertIn(estr, "ldb FULL SEARCH disabled")
1142
1143     def test_subtree_unindexable_presence(self):
1144         """Testing a search"""
1145
1146         try:
1147             res11 = self.l.search(base="DC=samba,DC=org",
1148                                   scope=ldb.SCOPE_SUBTREE,
1149                                   expression="(y=*)")
1150             if hasattr(self, 'IDX') and \
1151                hasattr(self, 'IDXCHECK'):
1152                 self.fail("Should have failed as un-indexed search")
1153
1154             self.assertEqual(len(res11), 24)
1155
1156         except ldb.LdbError as err:
1157             enum = err.args[0]
1158             estr = err.args[1]
1159             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1160             self.assertIn(estr, "ldb FULL SEARCH disabled")
1161
1162     def test_dn_filter_one(self):
1163         """Testing that a dn= filter succeeds
1164         (or fails with disallowDNFilter
1165         set and IDXGUID or (IDX and not IDXONE) mode)
1166         when the scope is SCOPE_ONELEVEL.
1167
1168         This should be made more consistent, but for now lock in
1169         the behaviour
1170
1171         """
1172
1173         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1174                               scope=ldb.SCOPE_ONELEVEL,
1175                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1176         if hasattr(self, 'disallowDNFilter') and \
1177            hasattr(self, 'IDX') and \
1178            (hasattr(self, 'IDXGUID') or
1179             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1180             self.assertEqual(len(res11), 0)
1181         else:
1182             self.assertEqual(len(res11), 1)
1183
1184     def test_dn_filter_subtree(self):
1185         """Testing that a dn= filter succeeds
1186         (or fails with disallowDNFilter set)
1187         when the scope is SCOPE_SUBTREE"""
1188
1189         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1190                               scope=ldb.SCOPE_SUBTREE,
1191                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1192         if hasattr(self, 'disallowDNFilter') \
1193            and hasattr(self, 'IDX'):
1194             self.assertEqual(len(res11), 0)
1195         else:
1196             self.assertEqual(len(res11), 1)
1197
1198     def test_dn_filter_base(self):
1199         """Testing that (incorrectly) a dn= filter works
1200         when the scope is SCOPE_BASE"""
1201
1202         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1203                               scope=ldb.SCOPE_BASE,
1204                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1205
1206         # At some point we should fix this, but it isn't trivial
1207         self.assertEqual(len(res11), 1)
1208
1209     def test_distinguishedName_filter_one(self):
1210         """Testing that a distinguishedName= filter succeeds
1211         when the scope is SCOPE_ONELEVEL.
1212
1213         This should be made more consistent, but for now lock in
1214         the behaviour
1215
1216         """
1217
1218         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1219                               scope=ldb.SCOPE_ONELEVEL,
1220                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1221         self.assertEqual(len(res11), 1)
1222
1223     def test_distinguishedName_filter_subtree(self):
1224         """Testing that a distinguishedName= filter succeeds
1225         when the scope is SCOPE_SUBTREE"""
1226
1227         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1228                               scope=ldb.SCOPE_SUBTREE,
1229                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1230         self.assertEqual(len(res11), 1)
1231
1232     def test_distinguishedName_filter_base(self):
1233         """Testing that (incorrectly) a distinguishedName= filter works
1234         when the scope is SCOPE_BASE"""
1235
1236         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1237                               scope=ldb.SCOPE_BASE,
1238                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1239
1240         # At some point we should fix this, but it isn't trivial
1241         self.assertEqual(len(res11), 1)
1242
1243     def test_bad_dn_filter_base(self):
1244         """Testing that a dn= filter on an invalid DN works
1245         when the scope is SCOPE_BASE but
1246         returns zero results"""
1247
1248         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1249                               scope=ldb.SCOPE_BASE,
1250                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1251
1252         # At some point we should fix this, but it isn't trivial
1253         self.assertEqual(len(res11), 0)
1254
1255
1256     def test_bad_dn_filter_one(self):
1257         """Testing that a dn= filter succeeds but returns zero
1258         results when the DN is not valid on a SCOPE_ONELEVEL search
1259
1260         """
1261
1262         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1263                               scope=ldb.SCOPE_ONELEVEL,
1264                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1265         self.assertEqual(len(res11), 0)
1266
1267     def test_bad_dn_filter_subtree(self):
1268         """Testing that a dn= filter succeeds but returns zero
1269         results when the DN is not valid on a SCOPE_SUBTREE search
1270
1271         """
1272
1273         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1274                               scope=ldb.SCOPE_SUBTREE,
1275                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1276         self.assertEqual(len(res11), 0)
1277
1278     def test_bad_distinguishedName_filter_base(self):
1279         """Testing that a distinguishedName= filter on an invalid DN works
1280         when the scope is SCOPE_BASE but
1281         returns zero results"""
1282
1283         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1284                               scope=ldb.SCOPE_BASE,
1285                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1286
1287         # At some point we should fix this, but it isn't trivial
1288         self.assertEqual(len(res11), 0)
1289
1290
1291     def test_bad_distinguishedName_filter_one(self):
1292         """Testing that a distinguishedName= filter succeeds but returns zero
1293         results when the DN is not valid on a SCOPE_ONELEVEL search
1294
1295         """
1296
1297         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1298                               scope=ldb.SCOPE_ONELEVEL,
1299                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1300         self.assertEqual(len(res11), 0)
1301
1302     def test_bad_distinguishedName_filter_subtree(self):
1303         """Testing that a distinguishedName= filter succeeds but returns zero
1304         results when the DN is not valid on a SCOPE_SUBTREE search
1305
1306         """
1307
1308         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1309                               scope=ldb.SCOPE_SUBTREE,
1310                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1311         self.assertEqual(len(res11), 0)
1312
1313     def test_bad_dn_search_base(self):
1314         """Testing with a bad base DN (SCOPE_BASE)"""
1315
1316         try:
1317             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1318                                   scope=ldb.SCOPE_BASE)
1319             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1320         except ldb.LdbError as err:
1321             enum = err.args[0]
1322             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1323
1324
1325     def test_bad_dn_search_one(self):
1326         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1327
1328         try:
1329             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1330                               scope=ldb.SCOPE_ONELEVEL)
1331             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1332         except ldb.LdbError as err:
1333             enum = err.args[0]
1334             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1335
1336     def test_bad_dn_search_subtree(self):
1337         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1338
1339         try:
1340             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1341                                   scope=ldb.SCOPE_SUBTREE)
1342             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1343         except ldb.LdbError as err:
1344             enum = err.args[0]
1345             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1346
1347
1348
1349 # Run the search tests against an lmdb backend
1350 class SearchTestsLmdb(SearchTests):
1351
1352     def setUp(self):
1353         self.prefix = MDB_PREFIX
1354         self.index = MDB_INDEX_OBJ
1355         super(SearchTestsLmdb, self).setUp()
1356
1357     def tearDown(self):
1358         super(SearchTestsLmdb, self).tearDown()
1359
1360
1361 class IndexedSearchTests(SearchTests):
1362     """Test searches using the index, to ensure the index doesn't
1363        break things"""
1364
1365     def setUp(self):
1366         super(IndexedSearchTests, self).setUp()
1367         self.l.add({"dn": "@INDEXLIST",
1368                     "@IDXATTR": [b"x", b"y", b"ou"]})
1369         self.IDX = True
1370
1371
1372 class IndexedCheckSearchTests(IndexedSearchTests):
1373     """Test searches using the index, to ensure the index doesn't
1374        break things (full scan disabled)"""
1375
1376     def setUp(self):
1377         self.IDXCHECK = True
1378         super(IndexedCheckSearchTests, self).setUp()
1379
1380
1381 class IndexedSearchDnFilterTests(SearchTests):
1382     """Test searches using the index, to ensure the index doesn't
1383        break things"""
1384
1385     def setUp(self):
1386         super(IndexedSearchDnFilterTests, self).setUp()
1387         self.l.add({"dn": "@OPTIONS",
1388                     "disallowDNFilter": "TRUE"})
1389         self.disallowDNFilter = True
1390
1391         self.l.add({"dn": "@INDEXLIST",
1392                     "@IDXATTR": [b"x", b"y", b"ou"]})
1393         self.IDX = True
1394
1395
1396 class IndexedAndOneLevelSearchTests(SearchTests):
1397     """Test searches using the index including @IDXONE, to ensure
1398        the index doesn't break things"""
1399
1400     def setUp(self):
1401         super(IndexedAndOneLevelSearchTests, self).setUp()
1402         self.l.add({"dn": "@INDEXLIST",
1403                     "@IDXATTR": [b"x", b"y", b"ou"],
1404                     "@IDXONE": [b"1"]})
1405         self.IDX = True
1406         self.IDXONE = True
1407
1408
1409 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1410     """Test searches using the index including @IDXONE, to ensure
1411        the index doesn't break things (full scan disabled)"""
1412
1413     def setUp(self):
1414         self.IDXCHECK = True
1415         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1416
1417
1418 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1419     """Test searches using the index including @IDXONE, to ensure
1420        the index doesn't break things"""
1421
1422     def setUp(self):
1423         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1424         self.l.add({"dn": "@OPTIONS",
1425                     "disallowDNFilter": "TRUE",
1426                     "checkBaseOnSearch": "TRUE"})
1427         self.disallowDNFilter = True
1428         self.checkBaseOnSearch = True
1429
1430         self.l.add({"dn": "@INDEXLIST",
1431                     "@IDXATTR": [b"x", b"y", b"ou"],
1432                     "@IDXONE": [b"1"]})
1433         self.IDX = True
1434         self.IDXONE = True
1435
1436
1437 class GUIDIndexedSearchTests(SearchTests):
1438     """Test searches using the index, to ensure the index doesn't
1439        break things"""
1440
1441     def setUp(self):
1442         self.index = {"dn": "@INDEXLIST",
1443                       "@IDXATTR": [b"x", b"y", b"ou"],
1444                       "@IDXGUID": [b"objectUUID"],
1445                       "@IDX_DN_GUID": [b"GUID"]}
1446         super(GUIDIndexedSearchTests, self).setUp()
1447
1448         self.IDXGUID = True
1449         self.IDXONE = True
1450
1451
1452 class GUIDIndexedDNFilterSearchTests(SearchTests):
1453     """Test searches using the index, to ensure the index doesn't
1454        break things"""
1455
1456     def setUp(self):
1457         self.index = {"dn": "@INDEXLIST",
1458                       "@IDXATTR": [b"x", b"y", b"ou"],
1459                       "@IDXGUID": [b"objectUUID"],
1460                       "@IDX_DN_GUID": [b"GUID"]}
1461         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1462         self.l.add({"dn": "@OPTIONS",
1463                     "disallowDNFilter": "TRUE",
1464                     "checkBaseOnSearch": "TRUE"})
1465         self.disallowDNFilter = True
1466         self.checkBaseOnSearch = True
1467         self.IDX = True
1468         self.IDXGUID = True
1469
1470
1471 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1472     """Test searches using the index including @IDXONE, to ensure
1473        the index doesn't break things"""
1474
1475     def setUp(self):
1476         self.index = {"dn": "@INDEXLIST",
1477                       "@IDXATTR": [b"x", b"y", b"ou"],
1478                       "@IDXGUID": [b"objectUUID"],
1479                       "@IDX_DN_GUID": [b"GUID"]}
1480         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1481         self.l.add({"dn": "@OPTIONS",
1482                     "disallowDNFilter": "TRUE",
1483                     "checkBaseOnSearch": "TRUE"})
1484         self.disallowDNFilter = True
1485         self.checkBaseOnSearch = True
1486         self.IDX = True
1487         self.IDXGUID = True
1488         self.IDXONE = True
1489
1490
1491 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1492
1493     def setUp(self):
1494         self.prefix = MDB_PREFIX
1495         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1496
1497     def tearDown(self):
1498         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1499
1500
1501 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1502
1503     def setUp(self):
1504         self.prefix = MDB_PREFIX
1505         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1506
1507     def tearDown(self):
1508         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1509
1510
1511 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1512
1513     def setUp(self):
1514         self.prefix = MDB_PREFIX
1515         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1516
1517     def tearDown(self):
1518         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1519
1520
1521 class AddModifyTests(LdbBaseTest):
1522     def tearDown(self):
1523         shutil.rmtree(self.testdir)
1524         super(AddModifyTests, self).tearDown()
1525
1526         # Ensure the LDB is closed now, so we close the FD
1527         del(self.l)
1528
1529     def setUp(self):
1530         super(AddModifyTests, self).setUp()
1531         self.testdir = tempdir()
1532         self.filename = os.path.join(self.testdir, "add_test.ldb")
1533         self.l = ldb.Ldb(self.url(),
1534                          flags=self.flags(),
1535                          options=["modules:rdn_name"])
1536         try:
1537             self.l.add(self.index)
1538         except AttributeError:
1539             pass
1540
1541         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1542                     "name": b"samba.org",
1543                     "objectUUID": b"0123456789abcdef"})
1544         self.l.add({"dn": "@ATTRIBUTES",
1545                     "objectUUID": "UNIQUE_INDEX"})
1546
1547     def test_add_dup(self):
1548         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1549                     "name": b"Admins",
1550                     "x": "z", "y": "a",
1551                     "objectUUID": b"0123456789abcde1"})
1552         try:
1553             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1554                         "name": b"Admins",
1555                         "x": "z", "y": "a",
1556                         "objectUUID": b"0123456789abcde2"})
1557             self.fail("Should have failed adding dupliate entry")
1558         except ldb.LdbError as err:
1559             enum = err.args[0]
1560             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1561
1562     def test_add_bad(self):
1563         try:
1564             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1565                         "name": b"Admins",
1566                         "x": "z", "y": "a",
1567                         "objectUUID": b"0123456789abcde1"})
1568             self.fail("Should have failed adding entry with invalid DN")
1569         except ldb.LdbError as err:
1570             enum = err.args[0]
1571             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1572
1573     def test_add_del_add(self):
1574         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1575                     "name": b"Admins",
1576                     "x": "z", "y": "a",
1577                     "objectUUID": b"0123456789abcde1"})
1578         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1579         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1580                     "name": b"Admins",
1581                     "x": "z", "y": "a",
1582                     "objectUUID": b"0123456789abcde2"})
1583
1584     def test_add_move_add(self):
1585         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1586                     "name": b"Admins",
1587                     "x": "z", "y": "a",
1588                     "objectUUID": b"0123456789abcde1"})
1589         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1590                       "OU=DUP2,DC=SAMBA,DC=ORG")
1591         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1592                     "name": b"Admins",
1593                     "x": "z", "y": "a",
1594                     "objectUUID": b"0123456789abcde2"})
1595
1596     def test_add_move_fail_move_move(self):
1597         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1598                     "name": b"Admins",
1599                     "x": "z", "y": "a",
1600                     "objectUUID": b"0123456789abcde1"})
1601         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1602                     "name": b"Admins",
1603                     "x": "z", "y": "a",
1604                     "objectUUID": b"0123456789abcde2"})
1605
1606         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1607                              scope=ldb.SCOPE_SUBTREE,
1608                              expression="(objectUUID=0123456789abcde1)")
1609         self.assertEqual(len(res2), 1)
1610         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1611
1612         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1613                              scope=ldb.SCOPE_SUBTREE,
1614                              expression="(objectUUID=0123456789abcde2)")
1615         self.assertEqual(len(res3), 1)
1616         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1617
1618         try:
1619             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1620                           "OU=DUP2,DC=SAMBA,DC=ORG")
1621             self.fail("Should have failed on duplicate DN")
1622         except ldb.LdbError as err:
1623             enum = err.args[0]
1624             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1625
1626         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1627                       "OU=DUP3,DC=SAMBA,DC=ORG")
1628
1629         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1630                       "OU=DUP2,DC=SAMBA,DC=ORG")
1631
1632         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1633                              scope=ldb.SCOPE_SUBTREE,
1634                              expression="(objectUUID=0123456789abcde1)")
1635         self.assertEqual(len(res2), 1)
1636         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1637
1638         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1639                              scope=ldb.SCOPE_SUBTREE,
1640                              expression="(objectUUID=0123456789abcde2)")
1641         self.assertEqual(len(res3), 1)
1642         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1643
1644     def test_move_missing(self):
1645         try:
1646             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1647                           "OU=DUP2,DC=SAMBA,DC=ORG")
1648             self.fail("Should have failed on missing")
1649         except ldb.LdbError as err:
1650             enum = err.args[0]
1651             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1652
1653     def test_move_missing2(self):
1654         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1655                     "name": b"Admins",
1656                     "x": "z", "y": "a",
1657                     "objectUUID": b"0123456789abcde2"})
1658
1659         try:
1660             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1661                           "OU=DUP2,DC=SAMBA,DC=ORG")
1662             self.fail("Should have failed on missing")
1663         except ldb.LdbError as err:
1664             enum = err.args[0]
1665             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1666
1667     def test_move_bad(self):
1668         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1669                     "name": b"Admins",
1670                     "x": "z", "y": "a",
1671                     "objectUUID": b"0123456789abcde2"})
1672
1673         try:
1674             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1675                           "OU=DUP2,DC=SAMBA,DC=ORG")
1676             self.fail("Should have failed on invalid DN")
1677         except ldb.LdbError as err:
1678             enum = err.args[0]
1679             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1680
1681     def test_move_bad2(self):
1682         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1683                     "name": b"Admins",
1684                     "x": "z", "y": "a",
1685                     "objectUUID": b"0123456789abcde2"})
1686
1687         try:
1688             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1689                           "OUXDUP2,DC=SAMBA,DC=ORG")
1690             self.fail("Should have failed on missing")
1691         except ldb.LdbError as err:
1692             enum = err.args[0]
1693             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1694
1695     def test_move_fail_move_add(self):
1696         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1697                     "name": b"Admins",
1698                     "x": "z", "y": "a",
1699                     "objectUUID": b"0123456789abcde1"})
1700         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1701                     "name": b"Admins",
1702                     "x": "z", "y": "a",
1703                     "objectUUID": b"0123456789abcde2"})
1704         try:
1705             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1706                           "OU=DUP2,DC=SAMBA,DC=ORG")
1707             self.fail("Should have failed on duplicate DN")
1708         except ldb.LdbError as err:
1709             enum = err.args[0]
1710             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1711
1712         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1713                       "OU=DUP3,DC=SAMBA,DC=ORG")
1714
1715         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1716                     "name": b"Admins",
1717                     "x": "z", "y": "a",
1718                     "objectUUID": b"0123456789abcde3"})
1719
1720
1721 class AddModifyTestsLmdb(AddModifyTests):
1722
1723     def setUp(self):
1724         self.prefix = MDB_PREFIX
1725         self.index = MDB_INDEX_OBJ
1726         super(AddModifyTestsLmdb, self).setUp()
1727
1728     def tearDown(self):
1729         super(AddModifyTestsLmdb, self).tearDown()
1730
1731
1732 class IndexedAddModifyTests(AddModifyTests):
1733     """Test searches using the index, to ensure the index doesn't
1734        break things"""
1735
1736     def setUp(self):
1737         if not hasattr(self, 'index'):
1738             self.index = {"dn": "@INDEXLIST",
1739                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1740                           "@IDXONE": [b"1"]}
1741         super(IndexedAddModifyTests, self).setUp()
1742
1743     def test_duplicate_GUID(self):
1744         try:
1745             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1746                         "name": b"Admins",
1747                         "x": "z", "y": "a",
1748                         "objectUUID": b"0123456789abcdef"})
1749             self.fail("Should have failed adding dupliate GUID")
1750         except ldb.LdbError as err:
1751             enum = err.args[0]
1752             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1753
1754     def test_duplicate_name_dup_GUID(self):
1755         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1756                     "name": b"Admins",
1757                     "x": "z", "y": "a",
1758                     "objectUUID": b"a123456789abcdef"})
1759         try:
1760             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1761                         "name": b"Admins",
1762                         "x": "z", "y": "a",
1763                         "objectUUID": b"a123456789abcdef"})
1764             self.fail("Should have failed adding dupliate GUID")
1765         except ldb.LdbError as err:
1766             enum = err.args[0]
1767             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1768
1769     def test_duplicate_name_dup_GUID2(self):
1770         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1771                     "name": b"Admins",
1772                     "x": "z", "y": "a",
1773                     "objectUUID": b"abc3456789abcdef"})
1774         try:
1775             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1776                         "name": b"Admins",
1777                         "x": "z", "y": "a",
1778                         "objectUUID": b"aaa3456789abcdef"})
1779             self.fail("Should have failed adding dupliate DN")
1780         except ldb.LdbError as err:
1781             enum = err.args[0]
1782             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1783
1784         # Checking the GUID didn't stick in the index
1785         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1786                     "name": b"Admins",
1787                     "x": "z", "y": "a",
1788                     "objectUUID": b"aaa3456789abcdef"})
1789
1790     def test_add_dup_guid_add(self):
1791         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1792                     "name": b"Admins",
1793                     "x": "z", "y": "a",
1794                     "objectUUID": b"0123456789abcde1"})
1795         try:
1796             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1797                         "name": b"Admins",
1798                         "x": "z", "y": "a",
1799                         "objectUUID": b"0123456789abcde1"})
1800             self.fail("Should have failed on duplicate GUID")
1801
1802         except ldb.LdbError as err:
1803             enum = err.args[0]
1804             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1805
1806         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1807                     "name": b"Admins",
1808                     "x": "z", "y": "a",
1809                     "objectUUID": b"0123456789abcde2"})
1810
1811
1812 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1813     """Test searches using the index, to ensure the index doesn't
1814        break things"""
1815
1816     def setUp(self):
1817         self.index = {"dn": "@INDEXLIST",
1818                       "@IDXATTR": [b"x", b"y", b"ou"],
1819                       "@IDXONE": [b"1"],
1820                       "@IDXGUID": [b"objectUUID"],
1821                       "@IDX_DN_GUID": [b"GUID"]}
1822         super(GUIDIndexedAddModifyTests, self).setUp()
1823
1824
1825 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1826     """Test GUID index behaviour insdie the transaction"""
1827
1828     def setUp(self):
1829         super(GUIDTransIndexedAddModifyTests, self).setUp()
1830         self.l.transaction_start()
1831
1832     def tearDown(self):
1833         self.l.transaction_commit()
1834         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1835
1836
1837 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1838     """Test index behaviour insdie the transaction"""
1839
1840     def setUp(self):
1841         super(TransIndexedAddModifyTests, self).setUp()
1842         self.l.transaction_start()
1843
1844     def tearDown(self):
1845         self.l.transaction_commit()
1846         super(TransIndexedAddModifyTests, self).tearDown()
1847
1848
1849 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1850
1851     def setUp(self):
1852         self.prefix = MDB_PREFIX
1853         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1854
1855     def tearDown(self):
1856         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1857
1858
1859 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1860
1861     def setUp(self):
1862         self.prefix = MDB_PREFIX
1863         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1864
1865     def tearDown(self):
1866         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1867
1868
1869 class BadIndexTests(LdbBaseTest):
1870     def setUp(self):
1871         super(BadIndexTests, self).setUp()
1872         self.testdir = tempdir()
1873         self.filename = os.path.join(self.testdir, "test.ldb")
1874         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1875         if hasattr(self, 'IDXGUID'):
1876             self.ldb.add({"dn": "@INDEXLIST",
1877                           "@IDXATTR": [b"x", b"y", b"ou"],
1878                           "@IDXGUID": [b"objectUUID"],
1879                           "@IDX_DN_GUID": [b"GUID"]})
1880         else:
1881             self.ldb.add({"dn": "@INDEXLIST",
1882                           "@IDXATTR": [b"x", b"y", b"ou"]})
1883
1884         super(BadIndexTests, self).setUp()
1885
1886     def test_unique(self):
1887         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1888                       "objectUUID": b"0123456789abcde1",
1889                       "y": "1"})
1890         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1891                       "objectUUID": b"0123456789abcde2",
1892                       "y": "1"})
1893         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1894                       "objectUUID": b"0123456789abcde3",
1895                       "y": "1"})
1896
1897         res = self.ldb.search(expression="(y=1)",
1898                               base="dc=samba,dc=org")
1899         self.assertEquals(len(res), 3)
1900
1901         # Now set this to unique index, but forget to check the result
1902         try:
1903             self.ldb.add({"dn": "@ATTRIBUTES",
1904                           "y": "UNIQUE_INDEX"})
1905             self.fail()
1906         except ldb.LdbError:
1907             pass
1908
1909         # We must still have a working index
1910         res = self.ldb.search(expression="(y=1)",
1911                               base="dc=samba,dc=org")
1912         self.assertEquals(len(res), 3)
1913
1914     def test_unique_transaction(self):
1915         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1916                       "objectUUID": b"0123456789abcde1",
1917                       "y": "1"})
1918         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1919                       "objectUUID": b"0123456789abcde2",
1920                       "y": "1"})
1921         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1922                       "objectUUID": b"0123456789abcde3",
1923                       "y": "1"})
1924
1925         res = self.ldb.search(expression="(y=1)",
1926                               base="dc=samba,dc=org")
1927         self.assertEquals(len(res), 3)
1928
1929         self.ldb.transaction_start()
1930
1931         # Now set this to unique index, but forget to check the result
1932         try:
1933             self.ldb.add({"dn": "@ATTRIBUTES",
1934                           "y": "UNIQUE_INDEX"})
1935         except ldb.LdbError:
1936             pass
1937
1938         try:
1939             self.ldb.transaction_commit()
1940             self.fail()
1941
1942         except ldb.LdbError as err:
1943             enum = err.args[0]
1944             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1945
1946         # We must still have a working index
1947         res = self.ldb.search(expression="(y=1)",
1948                               base="dc=samba,dc=org")
1949
1950         self.assertEquals(len(res), 3)
1951
1952     def test_casefold(self):
1953         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1954                       "objectUUID": b"0123456789abcde1",
1955                       "y": "a"})
1956         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1957                       "objectUUID": b"0123456789abcde2",
1958                       "y": "A"})
1959         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1960                       "objectUUID": b"0123456789abcde3",
1961                       "y": ["a", "A"]})
1962
1963         res = self.ldb.search(expression="(y=a)",
1964                               base="dc=samba,dc=org")
1965         self.assertEquals(len(res), 2)
1966
1967         self.ldb.add({"dn": "@ATTRIBUTES",
1968                       "y": "CASE_INSENSITIVE"})
1969
1970         # We must still have a working index
1971         res = self.ldb.search(expression="(y=a)",
1972                               base="dc=samba,dc=org")
1973
1974         if hasattr(self, 'IDXGUID'):
1975             self.assertEquals(len(res), 3)
1976         else:
1977             # We should not return this entry twice, but sadly
1978             # we have not yet fixed
1979             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1980             self.assertEquals(len(res), 4)
1981
1982     def test_casefold_transaction(self):
1983         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1984                       "objectUUID": b"0123456789abcde1",
1985                       "y": "a"})
1986         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1987                       "objectUUID": b"0123456789abcde2",
1988                       "y": "A"})
1989         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1990                       "objectUUID": b"0123456789abcde3",
1991                       "y": ["a", "A"]})
1992
1993         res = self.ldb.search(expression="(y=a)",
1994                               base="dc=samba,dc=org")
1995         self.assertEquals(len(res), 2)
1996
1997         self.ldb.transaction_start()
1998
1999         self.ldb.add({"dn": "@ATTRIBUTES",
2000                       "y": "CASE_INSENSITIVE"})
2001
2002         self.ldb.transaction_commit()
2003
2004         # We must still have a working index
2005         res = self.ldb.search(expression="(y=a)",
2006                               base="dc=samba,dc=org")
2007
2008         if hasattr(self, 'IDXGUID'):
2009             self.assertEquals(len(res), 3)
2010         else:
2011             # We should not return this entry twice, but sadly
2012             # we have not yet fixed
2013             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2014             self.assertEquals(len(res), 4)
2015
2016     def tearDown(self):
2017         super(BadIndexTests, self).tearDown()
2018
2019
2020 class GUIDBadIndexTests(BadIndexTests):
2021     """Test Bad index things with GUID index mode"""
2022
2023     def setUp(self):
2024         self.IDXGUID = True
2025
2026         super(GUIDBadIndexTests, self).setUp()
2027
2028
2029 class DnTests(TestCase):
2030
2031     def setUp(self):
2032         super(DnTests, self).setUp()
2033         self.ldb = ldb.Ldb()
2034
2035     def tearDown(self):
2036         super(DnTests, self).tearDown()
2037         del(self.ldb)
2038
2039     def test_set_dn_invalid(self):
2040         x = ldb.Message()
2041
2042         def assign():
2043             x.dn = "astring"
2044         self.assertRaises(TypeError, assign)
2045
2046     def test_eq(self):
2047         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2048         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2049         self.assertEqual(x, y)
2050         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2051         self.assertNotEqual(x, y)
2052
2053     def test_str(self):
2054         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2055         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2056
2057     def test_repr(self):
2058         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2059         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2060
2061     def test_get_casefold(self):
2062         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2063         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2064
2065     def test_validate(self):
2066         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2067         self.assertTrue(x.validate())
2068
2069     def test_parent(self):
2070         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2071         self.assertEqual("bar=bloe", x.parent().__str__())
2072
2073     def test_parent_nonexistent(self):
2074         x = ldb.Dn(self.ldb, "@BLA")
2075         self.assertEqual(None, x.parent())
2076
2077     def test_is_valid(self):
2078         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2079         self.assertTrue(x.is_valid())
2080         x = ldb.Dn(self.ldb, "")
2081         self.assertTrue(x.is_valid())
2082
2083     def test_is_special(self):
2084         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2085         self.assertFalse(x.is_special())
2086         x = ldb.Dn(self.ldb, "@FOOBAR")
2087         self.assertTrue(x.is_special())
2088
2089     def test_check_special(self):
2090         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2091         self.assertFalse(x.check_special("FOOBAR"))
2092         x = ldb.Dn(self.ldb, "@FOOBAR")
2093         self.assertTrue(x.check_special("@FOOBAR"))
2094
2095     def test_len(self):
2096         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2097         self.assertEqual(2, len(x))
2098         x = ldb.Dn(self.ldb, "dc=foo21")
2099         self.assertEqual(1, len(x))
2100
2101     def test_add_child(self):
2102         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2103         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2104         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2105
2106     def test_add_base(self):
2107         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2108         base = ldb.Dn(self.ldb, "bla=bloe")
2109         self.assertTrue(x.add_base(base))
2110         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2111
2112     def test_add_child_str(self):
2113         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2114         self.assertTrue(x.add_child("bla=bloe"))
2115         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2116
2117     def test_add_base_str(self):
2118         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2119         base = "bla=bloe"
2120         self.assertTrue(x.add_base(base))
2121         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2122
2123     def test_add(self):
2124         x = ldb.Dn(self.ldb, "dc=foo24")
2125         y = ldb.Dn(self.ldb, "bar=bla")
2126         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2127
2128     def test_remove_base_components(self):
2129         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2130         x.remove_base_components(len(x) - 1)
2131         self.assertEqual("dc=foo24", str(x))
2132
2133     def test_parse_ldif(self):
2134         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2135         msg = next(msgs)
2136         self.assertEqual("foo=bar", str(msg[1].dn))
2137         self.assertTrue(isinstance(msg[1], ldb.Message))
2138         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2139         self.assertEqual("dn: foo=bar\n\n", ldif)
2140
2141     def test_parse_ldif_more(self):
2142         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2143         msg = next(msgs)
2144         self.assertEqual("foo=bar", str(msg[1].dn))
2145         msg = next(msgs)
2146         self.assertEqual("bar=bar", str(msg[1].dn))
2147
2148     def test_canonical_string(self):
2149         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2150         self.assertEqual("/bloe/foo25", x.canonical_str())
2151
2152     def test_canonical_ex_string(self):
2153         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2154         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2155
2156     def test_ldb_is_child_of(self):
2157         """Testing ldb_dn_compare_dn"""
2158         dn1 = ldb.Dn(self.ldb, "dc=base")
2159         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2160         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2161         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2162
2163         self.assertTrue(dn1.is_child_of(dn1))
2164         self.assertTrue(dn2.is_child_of(dn1))
2165         self.assertTrue(dn4.is_child_of(dn1))
2166         self.assertTrue(dn4.is_child_of(dn3))
2167         self.assertTrue(dn4.is_child_of(dn4))
2168         self.assertFalse(dn3.is_child_of(dn2))
2169         self.assertFalse(dn1.is_child_of(dn4))
2170
2171     def test_ldb_is_child_of_str(self):
2172         """Testing ldb_dn_compare_dn"""
2173         dn1_str = "dc=base"
2174         dn2_str = "cn=foo,dc=base"
2175         dn3_str = "cn=bar,dc=base"
2176         dn4_str = "cn=baz,cn=bar,dc=base"
2177
2178         dn1 = ldb.Dn(self.ldb, dn1_str)
2179         dn2 = ldb.Dn(self.ldb, dn2_str)
2180         dn3 = ldb.Dn(self.ldb, dn3_str)
2181         dn4 = ldb.Dn(self.ldb, dn4_str)
2182
2183         self.assertTrue(dn1.is_child_of(dn1_str))
2184         self.assertTrue(dn2.is_child_of(dn1_str))
2185         self.assertTrue(dn4.is_child_of(dn1_str))
2186         self.assertTrue(dn4.is_child_of(dn3_str))
2187         self.assertTrue(dn4.is_child_of(dn4_str))
2188         self.assertFalse(dn3.is_child_of(dn2_str))
2189         self.assertFalse(dn1.is_child_of(dn4_str))
2190
2191     def test_get_component_name(self):
2192         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2193         self.assertEqual(dn.get_component_name(0), 'cn')
2194         self.assertEqual(dn.get_component_name(1), 'dc')
2195         self.assertEqual(dn.get_component_name(2), None)
2196         self.assertEqual(dn.get_component_name(-1), None)
2197
2198     def test_get_component_value(self):
2199         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2200         self.assertEqual(dn.get_component_value(0), 'foo')
2201         self.assertEqual(dn.get_component_value(1), 'base')
2202         self.assertEqual(dn.get_component_name(2), None)
2203         self.assertEqual(dn.get_component_name(-1), None)
2204
2205     def test_set_component(self):
2206         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2207         dn.set_component(0, 'cn', 'bar')
2208         self.assertEqual(str(dn), "cn=bar,dc=base")
2209         dn.set_component(1, 'o', 'asep')
2210         self.assertEqual(str(dn), "cn=bar,o=asep")
2211         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2212         self.assertEqual(str(dn), "cn=bar,o=asep")
2213         dn.set_component(1, 'o', 'a,b+c')
2214         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2215
2216     def test_set_component_bytes(self):
2217         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2218         dn.set_component(0, 'cn', b'bar')
2219         self.assertEqual(str(dn), "cn=bar,dc=base")
2220         dn.set_component(1, 'o', b'asep')
2221         self.assertEqual(str(dn), "cn=bar,o=asep")
2222
2223     def test_set_component_none(self):
2224         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2225         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2226
2227     def test_get_extended_component_null(self):
2228         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2229         self.assertEqual(dn.get_extended_component("TEST"), None)
2230
2231     def test_get_extended_component(self):
2232         self.ldb._register_test_extensions()
2233         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2234         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2235
2236     def test_set_extended_component(self):
2237         self.ldb._register_test_extensions()
2238         dn = ldb.Dn(self.ldb, "dc=base")
2239         dn.set_extended_component("TEST", "foo")
2240         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2241         dn.set_extended_component("TEST", b"bar")
2242         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2243
2244     def test_extended_str(self):
2245         self.ldb._register_test_extensions()
2246         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2247         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2248
2249     def test_get_rdn_name(self):
2250         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2251         self.assertEqual(dn.get_rdn_name(), 'cn')
2252
2253     def test_get_rdn_value(self):
2254         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2255         self.assertEqual(dn.get_rdn_value(), 'foo')
2256
2257     def test_get_casefold(self):
2258         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2259         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2260
2261     def test_get_linearized(self):
2262         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2263         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2264
2265     def test_is_null(self):
2266         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2267         self.assertFalse(dn.is_null())
2268
2269         dn = ldb.Dn(self.ldb, '')
2270         self.assertTrue(dn.is_null())
2271
2272
2273 class LdbMsgTests(TestCase):
2274
2275     def setUp(self):
2276         super(LdbMsgTests, self).setUp()
2277         self.msg = ldb.Message()
2278
2279     def test_init_dn(self):
2280         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2281         self.assertEqual("dc=foo27", str(self.msg.dn))
2282
2283     def test_iter_items(self):
2284         self.assertEqual(0, len(self.msg.items()))
2285         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2286         self.assertEqual(1, len(self.msg.items()))
2287
2288     def test_repr(self):
2289         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2290         self.msg["dc"] = b"foo"
2291         if PY3:
2292             self.assertIn(repr(self.msg), [
2293                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2294                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2295             ])
2296             self.assertIn(repr(self.msg.text), [
2297                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2298                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2299             ])
2300         else:
2301             self.assertEqual(
2302                 repr(self.msg),
2303                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
2304             self.assertEqual(
2305                 repr(self.msg.text),
2306                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
2307
2308     def test_len(self):
2309         self.assertEqual(0, len(self.msg))
2310
2311     def test_notpresent(self):
2312         self.assertRaises(KeyError, lambda: self.msg["foo"])
2313
2314     def test_del(self):
2315         del self.msg["foo"]
2316
2317     def test_add(self):
2318         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2319
2320     def test_add_text(self):
2321         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2322
2323     def test_elements_empty(self):
2324         self.assertEqual([], self.msg.elements())
2325
2326     def test_elements(self):
2327         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2328         self.msg.add(el)
2329         self.assertEqual([el], self.msg.elements())
2330         self.assertEqual([el.text], self.msg.text.elements())
2331
2332     def test_add_value(self):
2333         self.assertEqual(0, len(self.msg))
2334         self.msg["foo"] = [b"foo"]
2335         self.assertEqual(1, len(self.msg))
2336
2337     def test_add_value_text(self):
2338         self.assertEqual(0, len(self.msg))
2339         self.msg["foo"] = ["foo"]
2340         self.assertEqual(1, len(self.msg))
2341
2342     def test_add_value_multiple(self):
2343         self.assertEqual(0, len(self.msg))
2344         self.msg["foo"] = [b"foo", b"bla"]
2345         self.assertEqual(1, len(self.msg))
2346         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2347
2348     def test_add_value_multiple_text(self):
2349         self.assertEqual(0, len(self.msg))
2350         self.msg["foo"] = ["foo", "bla"]
2351         self.assertEqual(1, len(self.msg))
2352         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2353
2354     def test_set_value(self):
2355         self.msg["foo"] = [b"fool"]
2356         self.assertEqual([b"fool"], list(self.msg["foo"]))
2357         self.msg["foo"] = [b"bar"]
2358         self.assertEqual([b"bar"], list(self.msg["foo"]))
2359
2360     def test_set_value_text(self):
2361         self.msg["foo"] = ["fool"]
2362         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2363         self.msg["foo"] = ["bar"]
2364         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2365
2366     def test_keys(self):
2367         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2368         self.msg["foo"] = [b"bla"]
2369         self.msg["bar"] = [b"bla"]
2370         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2371
2372     def test_keys_text(self):
2373         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2374         self.msg["foo"] = ["bla"]
2375         self.msg["bar"] = ["bla"]
2376         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2377
2378     def test_dn(self):
2379         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2380         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2381
2382     def test_get_dn(self):
2383         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2384         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2385
2386     def test_dn_text(self):
2387         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2388         self.assertEqual("@BASEINFO", str(self.msg.dn))
2389         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2390
2391     def test_get_dn_text(self):
2392         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2393         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2394         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2395
2396     def test_get_invalid(self):
2397         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2398         self.assertRaises(TypeError, self.msg.get, 42)
2399
2400     def test_get_other(self):
2401         self.msg["foo"] = [b"bar"]
2402         self.assertEqual(b"bar", self.msg.get("foo")[0])
2403         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2404         self.assertEqual(None, self.msg.get("foo", idx=1))
2405         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2406
2407     def test_get_other_text(self):
2408         self.msg["foo"] = ["bar"]
2409         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2410         self.assertEqual("bar", self.msg.text.get("foo")[0])
2411         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2412         self.assertEqual(None, self.msg.get("foo", idx=1))
2413         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2414
2415     def test_get_default(self):
2416         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2417         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2418
2419     def test_get_default_text(self):
2420         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2421         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2422
2423     def test_get_unknown(self):
2424         self.assertEqual(None, self.msg.get("lalalala"))
2425
2426     def test_get_unknown_text(self):
2427         self.assertEqual(None, self.msg.text.get("lalalala"))
2428
2429     def test_msg_diff(self):
2430         l = ldb.Ldb()
2431         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2432         msg1 = next(msgs)[1]
2433         msg2 = next(msgs)[1]
2434         msgdiff = l.msg_diff(msg1, msg2)
2435         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2436         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2437         self.assertEqual(1, len(msgdiff))
2438
2439     def test_equal_empty(self):
2440         msg1 = ldb.Message()
2441         msg2 = ldb.Message()
2442         self.assertEqual(msg1, msg2)
2443
2444     def test_equal_simplel(self):
2445         db = ldb.Ldb()
2446         msg1 = ldb.Message()
2447         msg1.dn = ldb.Dn(db, "foo=bar")
2448         msg2 = ldb.Message()
2449         msg2.dn = ldb.Dn(db, "foo=bar")
2450         self.assertEqual(msg1, msg2)
2451         msg1['foo'] = b'bar'
2452         msg2['foo'] = b'bar'
2453         self.assertEqual(msg1, msg2)
2454         msg2['foo'] = b'blie'
2455         self.assertNotEqual(msg1, msg2)
2456         msg2['foo'] = b'blie'
2457
2458     def test_from_dict(self):
2459         rec = {"dn": "dc=fromdict",
2460                "a1": [b"a1-val1", b"a1-val1"]}
2461         l = ldb.Ldb()
2462         # check different types of input Flags
2463         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2464             m = ldb.Message.from_dict(l, rec, flags)
2465             self.assertEqual(rec["a1"], list(m["a1"]))
2466             self.assertEqual(flags, m["a1"].flags())
2467         # check input params
2468         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2469         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2470         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2471         # Message.from_dict expects dictionary with 'dn'
2472         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2473         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2474
2475     def test_from_dict_text(self):
2476         rec = {"dn": "dc=fromdict",
2477                "a1": ["a1-val1", "a1-val1"]}
2478         l = ldb.Ldb()
2479         # check different types of input Flags
2480         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2481             m = ldb.Message.from_dict(l, rec, flags)
2482             self.assertEqual(rec["a1"], list(m.text["a1"]))
2483             self.assertEqual(flags, m.text["a1"].flags())
2484         # check input params
2485         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2486         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2487         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2488         # Message.from_dict expects dictionary with 'dn'
2489         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2490         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2491
2492     def test_copy_add_message_element(self):
2493         m = ldb.Message()
2494         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2495         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2496         mto = ldb.Message()
2497         mto["1"] = m["1"]
2498         mto["2"] = m["2"]
2499         self.assertEqual(mto["1"], m["1"])
2500         self.assertEqual(mto["2"], m["2"])
2501         mto = ldb.Message()
2502         mto.add(m["1"])
2503         mto.add(m["2"])
2504         self.assertEqual(mto["1"], m["1"])
2505         self.assertEqual(mto["2"], m["2"])
2506
2507     def test_copy_add_message_element_text(self):
2508         m = ldb.Message()
2509         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2510         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2511         mto = ldb.Message()
2512         mto["1"] = m["1"]
2513         mto["2"] = m["2"]
2514         self.assertEqual(mto["1"], m.text["1"])
2515         self.assertEqual(mto["2"], m.text["2"])
2516         mto = ldb.Message()
2517         mto.add(m["1"])
2518         mto.add(m["2"])
2519         self.assertEqual(mto.text["1"], m.text["1"])
2520         self.assertEqual(mto.text["2"], m.text["2"])
2521         self.assertEqual(mto["1"], m["1"])
2522         self.assertEqual(mto["2"], m["2"])
2523
2524
2525 class MessageElementTests(TestCase):
2526
2527     def test_cmp_element(self):
2528         x = ldb.MessageElement([b"foo"])
2529         y = ldb.MessageElement([b"foo"])
2530         z = ldb.MessageElement([b"bzr"])
2531         self.assertEqual(x, y)
2532         self.assertNotEqual(x, z)
2533
2534     def test_cmp_element_text(self):
2535         x = ldb.MessageElement([b"foo"])
2536         y = ldb.MessageElement(["foo"])
2537         self.assertEqual(x, y)
2538
2539     def test_create_iterable(self):
2540         x = ldb.MessageElement([b"foo"])
2541         self.assertEqual([b"foo"], list(x))
2542         self.assertEqual(["foo"], list(x.text))
2543
2544     def test_repr(self):
2545         x = ldb.MessageElement([b"foo"])
2546         if PY3:
2547             self.assertEqual("MessageElement([b'foo'])", repr(x))
2548             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2549         else:
2550             self.assertEqual("MessageElement(['foo'])", repr(x))
2551             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2552         x = ldb.MessageElement([b"foo", b"bla"])
2553         self.assertEqual(2, len(x))
2554         if PY3:
2555             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2556             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2557         else:
2558             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2559             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2560
2561     def test_get_item(self):
2562         x = ldb.MessageElement([b"foo", b"bar"])
2563         self.assertEqual(b"foo", x[0])
2564         self.assertEqual(b"bar", x[1])
2565         self.assertEqual(b"bar", x[-1])
2566         self.assertRaises(IndexError, lambda: x[45])
2567
2568     def test_get_item_text(self):
2569         x = ldb.MessageElement(["foo", "bar"])
2570         self.assertEqual("foo", x.text[0])
2571         self.assertEqual("bar", x.text[1])
2572         self.assertEqual("bar", x.text[-1])
2573         self.assertRaises(IndexError, lambda: x[45])
2574
2575     def test_len(self):
2576         x = ldb.MessageElement([b"foo", b"bar"])
2577         self.assertEqual(2, len(x))
2578
2579     def test_eq(self):
2580         x = ldb.MessageElement([b"foo", b"bar"])
2581         y = ldb.MessageElement([b"foo", b"bar"])
2582         self.assertEqual(y, x)
2583         x = ldb.MessageElement([b"foo"])
2584         self.assertNotEqual(y, x)
2585         y = ldb.MessageElement([b"foo"])
2586         self.assertEqual(y, x)
2587
2588     def test_extended(self):
2589         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2590         if PY3:
2591             self.assertEqual("MessageElement([b'456'])", repr(el))
2592             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2593         else:
2594             self.assertEqual("MessageElement(['456'])", repr(el))
2595             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2596
2597     def test_bad_text(self):
2598         el = ldb.MessageElement(b'\xba\xdd')
2599         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2600
2601
2602 class ModuleTests(TestCase):
2603
2604     def setUp(self):
2605         super(ModuleTests, self).setUp()
2606         self.testdir = tempdir()
2607         self.filename = os.path.join(self.testdir, "test.ldb")
2608         self.ldb = ldb.Ldb(self.filename)
2609
2610     def tearDown(self):
2611         shutil.rmtree(self.testdir)
2612         super(ModuleTests, self).setUp()
2613
2614     def test_register_module(self):
2615         class ExampleModule:
2616             name = "example"
2617         ldb.register_module(ExampleModule)
2618
2619     def test_use_module(self):
2620         ops = []
2621
2622         class ExampleModule:
2623             name = "bla"
2624
2625             def __init__(self, ldb, next):
2626                 ops.append("init")
2627                 self.next = next
2628
2629             def search(self, *args, **kwargs):
2630                 return self.next.search(*args, **kwargs)
2631
2632             def request(self, *args, **kwargs):
2633                 pass
2634
2635         ldb.register_module(ExampleModule)
2636         l = ldb.Ldb(self.filename)
2637         l.add({"dn": "@MODULES", "@LIST": "bla"})
2638         self.assertEqual([], ops)
2639         l = ldb.Ldb(self.filename)
2640         self.assertEqual(["init"], ops)
2641
2642
2643 class LdbResultTests(LdbBaseTest):
2644
2645     def setUp(self):
2646         super(LdbResultTests, self).setUp()
2647         self.testdir = tempdir()
2648         self.filename = os.path.join(self.testdir, "test.ldb")
2649         self.l = ldb.Ldb(self.url(), flags=self.flags())
2650         try:
2651             self.l.add(self.index)
2652         except AttributeError:
2653             pass
2654         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2655                     "objectUUID": b"0123456789abcde0"})
2656         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2657                     "objectUUID": b"0123456789abcde1"})
2658         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2659                     "objectUUID": b"0123456789abcde2"})
2660         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2661                     "objectUUID": b"0123456789abcde3"})
2662         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2663                     "objectUUID": b"0123456789abcde4"})
2664         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2665                     "objectUUID": b"0123456789abcde5"})
2666         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2667                     "objectUUID": b"0123456789abcde6"})
2668         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2669                     "objectUUID": b"0123456789abcde7"})
2670         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2671                     "objectUUID": b"0123456789abcde8"})
2672         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2673                     "objectUUID": b"0123456789abcde9"})
2674         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2675                     "objectUUID": b"0123456789abcdea"})
2676         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2677                     "objectUUID": b"0123456789abcdeb"})
2678         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2679                     "objectUUID": b"0123456789abcdec"})
2680
2681     def tearDown(self):
2682         shutil.rmtree(self.testdir)
2683         super(LdbResultTests, self).tearDown()
2684         # Ensure the LDB is closed now, so we close the FD
2685         del(self.l)
2686
2687     def test_return_type(self):
2688         res = self.l.search()
2689         self.assertEqual(str(res), "<ldb result>")
2690
2691     def test_get_msgs(self):
2692         res = self.l.search()
2693         list = res.msgs
2694
2695     def test_get_controls(self):
2696         res = self.l.search()
2697         list = res.controls
2698
2699     def test_get_referals(self):
2700         res = self.l.search()
2701         list = res.referals
2702
2703     def test_iter_msgs(self):
2704         found = False
2705         for l in self.l.search().msgs:
2706             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2707                 found = True
2708         self.assertTrue(found)
2709
2710     def test_iter_msgs_count(self):
2711         self.assertTrue(self.l.search().count > 0)
2712         # 13 objects has been added to the DC=SAMBA, DC=ORG
2713         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2714
2715     def test_iter_controls(self):
2716         res = self.l.search().controls
2717         it = iter(res)
2718
2719     def test_create_control(self):
2720         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2721         c = ldb.Control(self.l, "relax:1")
2722         self.assertEqual(c.critical, True)
2723         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2724
2725     def test_iter_refs(self):
2726         res = self.l.search().referals
2727         it = iter(res)
2728
2729     def test_search_sequence_msgs(self):
2730         found = False
2731         res = self.l.search().msgs
2732
2733         for i in range(0, len(res)):
2734             l = res[i]
2735             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2736                 found = True
2737         self.assertTrue(found)
2738
2739     def test_search_as_iter(self):
2740         found = False
2741         res = self.l.search()
2742
2743         for l in res:
2744             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2745                 found = True
2746         self.assertTrue(found)
2747
2748     def test_search_iter(self):
2749         found = False
2750         res = self.l.search_iterator()
2751
2752         for l in res:
2753             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2754                 found = True
2755         self.assertTrue(found)
2756
2757     # Show that search results can't see into a transaction
2758
2759     def test_search_against_trans(self):
2760         found11 = False
2761
2762         (r1, w1) = os.pipe()
2763
2764         (r2, w2) = os.pipe()
2765
2766         # For the first element, fork a child that will
2767         # write to the DB
2768         pid = os.fork()
2769         if pid == 0:
2770             # In the child, re-open
2771             del(self.l)
2772             gc.collect()
2773
2774             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2775             # start a transaction
2776             child_ldb.transaction_start()
2777
2778             # write to it
2779             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2780                            "name": b"samba.org",
2781                            "objectUUID": b"o123456789acbdef"})
2782
2783             os.write(w1, b"added")
2784
2785             # Now wait for the search to be done
2786             os.read(r2, 6)
2787
2788             # and commit
2789             try:
2790                 child_ldb.transaction_commit()
2791             except LdbError as err:
2792                 # We print this here to see what went wrong in the child
2793                 print(err)
2794                 os._exit(1)
2795
2796             os.write(w1, b"transaction")
2797             os._exit(0)
2798
2799         self.assertEqual(os.read(r1, 5), b"added")
2800
2801         # This should not turn up until the transaction is concluded
2802         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2803                               scope=ldb.SCOPE_BASE)
2804         self.assertEqual(len(res11), 0)
2805
2806         os.write(w2, b"search")
2807
2808         # Now wait for the transaction to be done.  This should
2809         # deadlock, but the search doesn't hold a read lock for the
2810         # iterator lifetime currently.
2811         self.assertEqual(os.read(r1, 11), b"transaction")
2812
2813         # This should now turn up, as the transaction is over
2814         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2815                               scope=ldb.SCOPE_BASE)
2816         self.assertEqual(len(res11), 1)
2817
2818         self.assertFalse(found11)
2819
2820         (got_pid, status) = os.waitpid(pid, 0)
2821         self.assertEqual(got_pid, pid)
2822
2823     def test_search_iter_against_trans(self):
2824         found = False
2825         found11 = False
2826
2827         # We need to hold this iterator open to hold the all-record
2828         # lock
2829         res = self.l.search_iterator()
2830
2831         (r1, w1) = os.pipe()
2832
2833         (r2, w2) = os.pipe()
2834
2835         # For the first element, with the sequence open (which
2836         # means with ldb locks held), fork a child that will
2837         # write to the DB
2838         pid = os.fork()
2839         if pid == 0:
2840             # In the child, re-open
2841             del(res)
2842             del(self.l)
2843             gc.collect()
2844
2845             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2846             # start a transaction
2847             child_ldb.transaction_start()
2848
2849             # write to it
2850             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2851                            "name": b"samba.org",
2852                            "objectUUID": b"o123456789acbdef"})
2853
2854             os.write(w1, b"added")
2855
2856             # Now wait for the search to be done
2857             os.read(r2, 6)
2858
2859             # and commit
2860             try:
2861                 child_ldb.transaction_commit()
2862             except LdbError as err:
2863                 # We print this here to see what went wrong in the child
2864                 print(err)
2865                 os._exit(1)
2866
2867             os.write(w1, b"transaction")
2868             os._exit(0)
2869
2870         self.assertEqual(os.read(r1, 5), b"added")
2871
2872         # This should not turn up until the transaction is concluded
2873         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2874                               scope=ldb.SCOPE_BASE)
2875         self.assertEqual(len(res11), 0)
2876
2877         os.write(w2, b"search")
2878
2879         # allow the transaction to start
2880         time.sleep(1)
2881
2882         # This should not turn up until the search finishes and
2883         # removed the read lock, but for ldb_tdb that happened as soon
2884         # as we called the first res.next()
2885         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2886                               scope=ldb.SCOPE_BASE)
2887         self.assertEqual(len(res11), 0)
2888
2889         # These results are all collected at the first next(res) call
2890         for l in res:
2891             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2892                 found = True
2893             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2894                 found11 = True
2895
2896         # Now wait for the transaction to be done.
2897         self.assertEqual(os.read(r1, 11), b"transaction")
2898
2899         # This should now turn up, as the transaction is over and all
2900         # read locks are gone
2901         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2902                               scope=ldb.SCOPE_BASE)
2903         self.assertEqual(len(res11), 1)
2904
2905         self.assertTrue(found)
2906         self.assertFalse(found11)
2907
2908         (got_pid, status) = os.waitpid(pid, 0)
2909         self.assertEqual(got_pid, pid)
2910
2911
2912 class LdbResultTestsLmdb(LdbResultTests):
2913
2914     def setUp(self):
2915         self.prefix = MDB_PREFIX
2916         self.index = MDB_INDEX_OBJ
2917         super(LdbResultTestsLmdb, self).setUp()
2918
2919     def tearDown(self):
2920         super(LdbResultTestsLmdb, self).tearDown()
2921
2922
2923 class BadTypeTests(TestCase):
2924     def test_control(self):
2925         l = ldb.Ldb()
2926         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2927         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2928
2929     def test_modify(self):
2930         l = ldb.Ldb()
2931         dn = ldb.Dn(l, 'a=b')
2932         m = ldb.Message(dn)
2933         self.assertRaises(TypeError, l.modify, '<bad type>')
2934         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2935
2936     def test_add(self):
2937         l = ldb.Ldb()
2938         dn = ldb.Dn(l, 'a=b')
2939         m = ldb.Message(dn)
2940         self.assertRaises(TypeError, l.add, '<bad type>')
2941         self.assertRaises(TypeError, l.add, m, '<bad type>')
2942
2943     def test_delete(self):
2944         l = ldb.Ldb()
2945         dn = ldb.Dn(l, 'a=b')
2946         self.assertRaises(TypeError, l.add, '<bad type>')
2947         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2948
2949     def test_rename(self):
2950         l = ldb.Ldb()
2951         dn = ldb.Dn(l, 'a=b')
2952         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2953         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2954         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2955
2956     def test_search(self):
2957         l = ldb.Ldb()
2958         self.assertRaises(TypeError, l.search, base=1234)
2959         self.assertRaises(TypeError, l.search, scope='<bad type>')
2960         self.assertRaises(TypeError, l.search, expression=1234)
2961         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2962         self.assertRaises(TypeError, l.search, controls='<bad type>')
2963
2964
2965 class VersionTests(TestCase):
2966
2967     def test_version(self):
2968         self.assertTrue(isinstance(ldb.__version__, str))
2969
2970
2971 if __name__ == '__main__':
2972     import unittest
2973     unittest.TestProgram()