471d70fc5218c126fff6c29d8859dfe7cbf18dca
[samba.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18 MDB_INDEX_OBJ = {
19     "dn": "@INDEXLIST",
20     "@IDXONE": [b"1"],
21     "@IDXGUID": [b"objectUUID"],
22     "@IDX_DN_GUID": [b"GUID"]
23 }
24
25 def tempdir():
26     import tempfile
27     try:
28         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
29     except KeyError:
30         dir_prefix = None
31     return tempfile.mkdtemp(dir=dir_prefix)
32
33
34 class NoContextTests(TestCase):
35
36     def test_valid_attr_name(self):
37         self.assertTrue(ldb.valid_attr_name("foo"))
38         self.assertFalse(ldb.valid_attr_name("24foo"))
39
40     def test_timestring(self):
41         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
42         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
43
44     def test_string_to_time(self):
45         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
46         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
47
48     def test_binary_encode(self):
49         encoded = ldb.binary_encode(b'test\\x')
50         decoded = ldb.binary_decode(encoded)
51         self.assertEqual(decoded, b'test\\x')
52
53         encoded2 = ldb.binary_encode('test\\x')
54         self.assertEqual(encoded2, encoded)
55
56
57 class LdbBaseTest(TestCase):
58     def setUp(self):
59         super(LdbBaseTest, self).setUp()
60         try:
61             if self.prefix is None:
62                 self.prefix = TDB_PREFIX
63         except AttributeError:
64             self.prefix = TDB_PREFIX
65
66     def tearDown(self):
67         super(LdbBaseTest, self).tearDown()
68
69     def url(self):
70         return self.prefix + self.filename
71
72     def flags(self):
73         if self.prefix == MDB_PREFIX:
74             return ldb.FLG_NOSYNC
75         else:
76             return 0
77
78
79 class SimpleLdb(LdbBaseTest):
80
81     def setUp(self):
82         super(SimpleLdb, self).setUp()
83         self.testdir = tempdir()
84         self.filename = os.path.join(self.testdir, "test.ldb")
85         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
86         try:
87             self.ldb.add(self.index)
88         except AttributeError:
89             pass
90
91     def tearDown(self):
92         shutil.rmtree(self.testdir)
93         super(SimpleLdb, self).tearDown()
94         # Ensure the LDB is closed now, so we close the FD
95         del(self.ldb)
96
97     def test_connect(self):
98         ldb.Ldb(self.url(), flags=self.flags())
99
100     def test_connect_none(self):
101         ldb.Ldb()
102
103     def test_connect_later(self):
104         x = ldb.Ldb()
105         x.connect(self.url(), flags=self.flags())
106
107     def test_repr(self):
108         x = ldb.Ldb()
109         self.assertTrue(repr(x).startswith("<ldb connection"))
110
111     def test_set_create_perms(self):
112         x = ldb.Ldb()
113         x.set_create_perms(0o600)
114
115     def test_modules_none(self):
116         x = ldb.Ldb()
117         self.assertEqual([], x.modules())
118
119     def test_modules_tdb(self):
120         x = ldb.Ldb(self.url(), flags=self.flags())
121         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
122
123     def test_firstmodule_none(self):
124         x = ldb.Ldb()
125         self.assertEqual(x.firstmodule, None)
126
127     def test_firstmodule_tdb(self):
128         x = ldb.Ldb(self.url(), flags=self.flags())
129         mod = x.firstmodule
130         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
131
132     def test_search(self):
133         l = ldb.Ldb(self.url(), flags=self.flags())
134         self.assertEqual(len(l.search()), 0)
135
136     def test_search_controls(self):
137         l = ldb.Ldb(self.url(), flags=self.flags())
138         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
139
140     def test_search_attrs(self):
141         l = ldb.Ldb(self.url(), flags=self.flags())
142         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
143
144     def test_search_string_dn(self):
145         l = ldb.Ldb(self.url(), flags=self.flags())
146         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
147
148     def test_search_attr_string(self):
149         l = ldb.Ldb(self.url(), flags=self.flags())
150         self.assertRaises(TypeError, l.search, attrs="dc")
151         self.assertRaises(TypeError, l.search, attrs=b"dc")
152
153     def test_opaque(self):
154         l = ldb.Ldb(self.url(), flags=self.flags())
155         l.set_opaque("my_opaque", l)
156         self.assertTrue(l.get_opaque("my_opaque") is not None)
157         self.assertEqual(None, l.get_opaque("unknown"))
158
159     def test_search_scope_base_empty_db(self):
160         l = ldb.Ldb(self.url(), flags=self.flags())
161         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
162                           ldb.SCOPE_BASE)), 0)
163
164     def test_search_scope_onelevel_empty_db(self):
165         l = ldb.Ldb(self.url(), flags=self.flags())
166         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
167                           ldb.SCOPE_ONELEVEL)), 0)
168
169     def test_delete(self):
170         l = ldb.Ldb(self.url(), flags=self.flags())
171         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
172
173     def test_delete_w_unhandled_ctrl(self):
174         l = ldb.Ldb(self.url(), flags=self.flags())
175         m = ldb.Message()
176         m.dn = ldb.Dn(l, "dc=foo1")
177         m["b"] = [b"a"]
178         m["objectUUID"] = b"0123456789abcdef"
179         l.add(m)
180         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
181         l.delete(m.dn)
182
183     def test_contains(self):
184         name = self.url()
185         l = ldb.Ldb(name, flags=self.flags())
186         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
187         l = ldb.Ldb(name, flags=self.flags())
188         m = ldb.Message()
189         m.dn = ldb.Dn(l, "dc=foo3")
190         m["b"] = ["a"]
191         m["objectUUID"] = b"0123456789abcdef"
192         l.add(m)
193         try:
194             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
195             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
196         finally:
197             l.delete(m.dn)
198
199     def test_get_config_basedn(self):
200         l = ldb.Ldb(self.url(), flags=self.flags())
201         self.assertEqual(None, l.get_config_basedn())
202
203     def test_get_root_basedn(self):
204         l = ldb.Ldb(self.url(), flags=self.flags())
205         self.assertEqual(None, l.get_root_basedn())
206
207     def test_get_schema_basedn(self):
208         l = ldb.Ldb(self.url(), flags=self.flags())
209         self.assertEqual(None, l.get_schema_basedn())
210
211     def test_get_default_basedn(self):
212         l = ldb.Ldb(self.url(), flags=self.flags())
213         self.assertEqual(None, l.get_default_basedn())
214
215     def test_add(self):
216         l = ldb.Ldb(self.url(), flags=self.flags())
217         m = ldb.Message()
218         m.dn = ldb.Dn(l, "dc=foo4")
219         m["bla"] = b"bla"
220         m["objectUUID"] = b"0123456789abcdef"
221         self.assertEqual(len(l.search()), 0)
222         l.add(m)
223         try:
224             self.assertEqual(len(l.search()), 1)
225         finally:
226             l.delete(ldb.Dn(l, "dc=foo4"))
227
228     def test_search_iterator(self):
229         l = ldb.Ldb(self.url(), flags=self.flags())
230         s = l.search_iterator()
231         s.abandon()
232         try:
233             for me in s:
234                 self.fail()
235             self.fail()
236         except RuntimeError as re:
237             pass
238         try:
239             s.abandon()
240             self.fail()
241         except RuntimeError as re:
242             pass
243         try:
244             s.result()
245             self.fail()
246         except RuntimeError as re:
247             pass
248
249         s = l.search_iterator()
250         count = 0
251         for me in s:
252             self.assertTrue(isinstance(me, ldb.Message))
253             count += 1
254         r = s.result()
255         self.assertEqual(len(r), 0)
256         self.assertEqual(count, 0)
257
258         m1 = ldb.Message()
259         m1.dn = ldb.Dn(l, "dc=foo4")
260         m1["bla"] = b"bla"
261         m1["objectUUID"] = b"0123456789abcdef"
262         l.add(m1)
263         try:
264             s = l.search_iterator()
265             msgs = []
266             for me in s:
267                 self.assertTrue(isinstance(me, ldb.Message))
268                 count += 1
269                 msgs.append(me)
270             r = s.result()
271             self.assertEqual(len(r), 0)
272             self.assertEqual(len(msgs), 1)
273             self.assertEqual(msgs[0].dn, m1.dn)
274
275             m2 = ldb.Message()
276             m2.dn = ldb.Dn(l, "dc=foo5")
277             m2["bla"] = b"bla"
278             m2["objectUUID"] = b"0123456789abcdee"
279             l.add(m2)
280
281             s = l.search_iterator()
282             msgs = []
283             for me in s:
284                 self.assertTrue(isinstance(me, ldb.Message))
285                 count += 1
286                 msgs.append(me)
287             r = s.result()
288             self.assertEqual(len(r), 0)
289             self.assertEqual(len(msgs), 2)
290             if msgs[0].dn == m1.dn:
291                 self.assertEqual(msgs[0].dn, m1.dn)
292                 self.assertEqual(msgs[1].dn, m2.dn)
293             else:
294                 self.assertEqual(msgs[0].dn, m2.dn)
295                 self.assertEqual(msgs[1].dn, m1.dn)
296
297             s = l.search_iterator()
298             msgs = []
299             for me in s:
300                 self.assertTrue(isinstance(me, ldb.Message))
301                 count += 1
302                 msgs.append(me)
303                 break
304             try:
305                 s.result()
306                 self.fail()
307             except RuntimeError as re:
308                 pass
309             for me in s:
310                 self.assertTrue(isinstance(me, ldb.Message))
311                 count += 1
312                 msgs.append(me)
313                 break
314             for me in s:
315                 self.fail()
316
317             r = s.result()
318             self.assertEqual(len(r), 0)
319             self.assertEqual(len(msgs), 2)
320             if msgs[0].dn == m1.dn:
321                 self.assertEqual(msgs[0].dn, m1.dn)
322                 self.assertEqual(msgs[1].dn, m2.dn)
323             else:
324                 self.assertEqual(msgs[0].dn, m2.dn)
325                 self.assertEqual(msgs[1].dn, m1.dn)
326         finally:
327             l.delete(ldb.Dn(l, "dc=foo4"))
328             l.delete(ldb.Dn(l, "dc=foo5"))
329
330     def test_add_text(self):
331         l = ldb.Ldb(self.url(), flags=self.flags())
332         m = ldb.Message()
333         m.dn = ldb.Dn(l, "dc=foo4")
334         m["bla"] = "bla"
335         m["objectUUID"] = b"0123456789abcdef"
336         self.assertEqual(len(l.search()), 0)
337         l.add(m)
338         try:
339             self.assertEqual(len(l.search()), 1)
340         finally:
341             l.delete(ldb.Dn(l, "dc=foo4"))
342
343     def test_add_w_unhandled_ctrl(self):
344         l = ldb.Ldb(self.url(), flags=self.flags())
345         m = ldb.Message()
346         m.dn = ldb.Dn(l, "dc=foo4")
347         m["bla"] = b"bla"
348         self.assertEqual(len(l.search()), 0)
349         self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
350
351     def test_add_dict(self):
352         l = ldb.Ldb(self.url(), flags=self.flags())
353         m = {"dn": ldb.Dn(l, "dc=foo5"),
354              "bla": b"bla",
355              "objectUUID": b"0123456789abcdef"}
356         self.assertEqual(len(l.search()), 0)
357         l.add(m)
358         try:
359             self.assertEqual(len(l.search()), 1)
360         finally:
361             l.delete(ldb.Dn(l, "dc=foo5"))
362
363     def test_add_dict_text(self):
364         l = ldb.Ldb(self.url(), flags=self.flags())
365         m = {"dn": ldb.Dn(l, "dc=foo5"),
366              "bla": "bla",
367              "objectUUID": b"0123456789abcdef"}
368         self.assertEqual(len(l.search()), 0)
369         l.add(m)
370         try:
371             self.assertEqual(len(l.search()), 1)
372         finally:
373             l.delete(ldb.Dn(l, "dc=foo5"))
374
375     def test_add_dict_string_dn(self):
376         l = ldb.Ldb(self.url(), flags=self.flags())
377         m = {"dn": "dc=foo6", "bla": b"bla",
378              "objectUUID": b"0123456789abcdef"}
379         self.assertEqual(len(l.search()), 0)
380         l.add(m)
381         try:
382             self.assertEqual(len(l.search()), 1)
383         finally:
384             l.delete(ldb.Dn(l, "dc=foo6"))
385
386     def test_add_dict_bytes_dn(self):
387         l = ldb.Ldb(self.url(), flags=self.flags())
388         m = {"dn": b"dc=foo6", "bla": b"bla",
389               "objectUUID": b"0123456789abcdef"}
390         self.assertEqual(len(l.search()), 0)
391         l.add(m)
392         try:
393             self.assertEqual(len(l.search()), 1)
394         finally:
395             l.delete(ldb.Dn(l, "dc=foo6"))
396
397     def test_rename(self):
398         l = ldb.Ldb(self.url(), flags=self.flags())
399         m = ldb.Message()
400         m.dn = ldb.Dn(l, "dc=foo7")
401         m["bla"] = b"bla"
402         m["objectUUID"] = b"0123456789abcdef"
403         self.assertEqual(len(l.search()), 0)
404         l.add(m)
405         try:
406             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
407             self.assertEqual(len(l.search()), 1)
408         finally:
409             l.delete(ldb.Dn(l, "dc=bar"))
410
411     def test_rename_string_dns(self):
412         l = ldb.Ldb(self.url(), flags=self.flags())
413         m = ldb.Message()
414         m.dn = ldb.Dn(l, "dc=foo8")
415         m["bla"] = b"bla"
416         m["objectUUID"] = b"0123456789abcdef"
417         self.assertEqual(len(l.search()), 0)
418         l.add(m)
419         self.assertEqual(len(l.search()), 1)
420         try:
421             l.rename("dc=foo8", "dc=bar")
422             self.assertEqual(len(l.search()), 1)
423         finally:
424             l.delete(ldb.Dn(l, "dc=bar"))
425
426     def test_rename_bad_string_dns(self):
427         l = ldb.Ldb(self.url(), flags=self.flags())
428         m = ldb.Message()
429         m.dn = ldb.Dn(l, "dc=foo8")
430         m["bla"] = b"bla"
431         m["objectUUID"] = b"0123456789abcdef"
432         self.assertEqual(len(l.search()), 0)
433         l.add(m)
434         self.assertEqual(len(l.search()), 1)
435         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
436         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
437         l.delete(ldb.Dn(l, "dc=foo8"))
438
439     def test_empty_dn(self):
440         l = ldb.Ldb(self.url(), flags=self.flags())
441         self.assertEqual(0, len(l.search()))
442         m = ldb.Message()
443         m.dn = ldb.Dn(l, "dc=empty")
444         m["objectUUID"] = b"0123456789abcdef"
445         l.add(m)
446         rm = l.search()
447         self.assertEqual(1, len(rm))
448         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
449                          set(rm[0].keys()))
450
451         rm = l.search(m.dn)
452         self.assertEqual(1, len(rm))
453         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
454                          set(rm[0].keys()))
455         rm = l.search(m.dn, attrs=["blah"])
456         self.assertEqual(1, len(rm))
457         self.assertEqual(0, len(rm[0]))
458
459     def test_modify_delete(self):
460         l = ldb.Ldb(self.url(), flags=self.flags())
461         m = ldb.Message()
462         m.dn = ldb.Dn(l, "dc=modifydelete")
463         m["bla"] = [b"1234"]
464         m["objectUUID"] = b"0123456789abcdef"
465         l.add(m)
466         rm = l.search(m.dn)[0]
467         self.assertEqual([b"1234"], list(rm["bla"]))
468         try:
469             m = ldb.Message()
470             m.dn = ldb.Dn(l, "dc=modifydelete")
471             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
472             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
473             l.modify(m)
474             rm = l.search(m.dn)
475             self.assertEqual(1, len(rm))
476             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
477                              set(rm[0].keys()))
478             rm = l.search(m.dn, attrs=["bla"])
479             self.assertEqual(1, len(rm))
480             self.assertEqual(0, len(rm[0]))
481         finally:
482             l.delete(ldb.Dn(l, "dc=modifydelete"))
483
484     def test_modify_delete_text(self):
485         l = ldb.Ldb(self.url(), flags=self.flags())
486         m = ldb.Message()
487         m.dn = ldb.Dn(l, "dc=modifydelete")
488         m.text["bla"] = ["1234"]
489         m["objectUUID"] = b"0123456789abcdef"
490         l.add(m)
491         rm = l.search(m.dn)[0]
492         self.assertEqual(["1234"], list(rm.text["bla"]))
493         try:
494             m = ldb.Message()
495             m.dn = ldb.Dn(l, "dc=modifydelete")
496             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
497             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
498             l.modify(m)
499             rm = l.search(m.dn)
500             self.assertEqual(1, len(rm))
501             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
502                              set(rm[0].keys()))
503             rm = l.search(m.dn, attrs=["bla"])
504             self.assertEqual(1, len(rm))
505             self.assertEqual(0, len(rm[0]))
506         finally:
507             l.delete(ldb.Dn(l, "dc=modifydelete"))
508
509     def test_modify_add(self):
510         l = ldb.Ldb(self.url(), flags=self.flags())
511         m = ldb.Message()
512         m.dn = ldb.Dn(l, "dc=add")
513         m["bla"] = [b"1234"]
514         m["objectUUID"] = b"0123456789abcdef"
515         l.add(m)
516         try:
517             m = ldb.Message()
518             m.dn = ldb.Dn(l, "dc=add")
519             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
520             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
521             l.modify(m)
522             rm = l.search(m.dn)[0]
523             self.assertEqual(3, len(rm))
524             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
525         finally:
526             l.delete(ldb.Dn(l, "dc=add"))
527
528     def test_modify_add_text(self):
529         l = ldb.Ldb(self.url(), flags=self.flags())
530         m = ldb.Message()
531         m.dn = ldb.Dn(l, "dc=add")
532         m.text["bla"] = ["1234"]
533         m["objectUUID"] = b"0123456789abcdef"
534         l.add(m)
535         try:
536             m = ldb.Message()
537             m.dn = ldb.Dn(l, "dc=add")
538             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
539             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
540             l.modify(m)
541             rm = l.search(m.dn)[0]
542             self.assertEqual(3, len(rm))
543             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
544         finally:
545             l.delete(ldb.Dn(l, "dc=add"))
546
547     def test_modify_replace(self):
548         l = ldb.Ldb(self.url(), flags=self.flags())
549         m = ldb.Message()
550         m.dn = ldb.Dn(l, "dc=modify2")
551         m["bla"] = [b"1234", b"456"]
552         m["objectUUID"] = b"0123456789abcdef"
553         l.add(m)
554         try:
555             m = ldb.Message()
556             m.dn = ldb.Dn(l, "dc=modify2")
557             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
558             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
559             l.modify(m)
560             rm = l.search(m.dn)[0]
561             self.assertEqual(3, len(rm))
562             self.assertEqual([b"789"], list(rm["bla"]))
563             rm = l.search(m.dn, attrs=["bla"])[0]
564             self.assertEqual(1, len(rm))
565         finally:
566             l.delete(ldb.Dn(l, "dc=modify2"))
567
568     def test_modify_replace_text(self):
569         l = ldb.Ldb(self.url(), flags=self.flags())
570         m = ldb.Message()
571         m.dn = ldb.Dn(l, "dc=modify2")
572         m.text["bla"] = ["1234", "456"]
573         m["objectUUID"] = b"0123456789abcdef"
574         l.add(m)
575         try:
576             m = ldb.Message()
577             m.dn = ldb.Dn(l, "dc=modify2")
578             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
579             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
580             l.modify(m)
581             rm = l.search(m.dn)[0]
582             self.assertEqual(3, len(rm))
583             self.assertEqual(["789"], list(rm.text["bla"]))
584             rm = l.search(m.dn, attrs=["bla"])[0]
585             self.assertEqual(1, len(rm))
586         finally:
587             l.delete(ldb.Dn(l, "dc=modify2"))
588
589     def test_modify_flags_change(self):
590         l = ldb.Ldb(self.url(), flags=self.flags())
591         m = ldb.Message()
592         m.dn = ldb.Dn(l, "dc=add")
593         m["bla"] = [b"1234"]
594         m["objectUUID"] = b"0123456789abcdef"
595         l.add(m)
596         try:
597             m = ldb.Message()
598             m.dn = ldb.Dn(l, "dc=add")
599             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
600             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
601             l.modify(m)
602             rm = l.search(m.dn)[0]
603             self.assertEqual(3, len(rm))
604             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
605
606             # Now create another modify, but switch the flags before we do it
607             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
608             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
609             l.modify(m)
610             rm = l.search(m.dn, attrs=["bla"])[0]
611             self.assertEqual(1, len(rm))
612             self.assertEqual([b"1234"], list(rm["bla"]))
613         finally:
614             l.delete(ldb.Dn(l, "dc=add"))
615
616     def test_modify_flags_change_text(self):
617         l = ldb.Ldb(self.url(), flags=self.flags())
618         m = ldb.Message()
619         m.dn = ldb.Dn(l, "dc=add")
620         m.text["bla"] = ["1234"]
621         m["objectUUID"] = b"0123456789abcdef"
622         l.add(m)
623         try:
624             m = ldb.Message()
625             m.dn = ldb.Dn(l, "dc=add")
626             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
627             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
628             l.modify(m)
629             rm = l.search(m.dn)[0]
630             self.assertEqual(3, len(rm))
631             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
632
633             # Now create another modify, but switch the flags before we do it
634             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
635             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
636             l.modify(m)
637             rm = l.search(m.dn, attrs=["bla"])[0]
638             self.assertEqual(1, len(rm))
639             self.assertEqual(["1234"], list(rm.text["bla"]))
640         finally:
641             l.delete(ldb.Dn(l, "dc=add"))
642
643     def test_transaction_commit(self):
644         l = ldb.Ldb(self.url(), flags=self.flags())
645         l.transaction_start()
646         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
647         m["foo"] = [b"bar"]
648         m["objectUUID"] = b"0123456789abcdef"
649         l.add(m)
650         l.transaction_commit()
651         l.delete(m.dn)
652
653     def test_transaction_cancel(self):
654         l = ldb.Ldb(self.url(), flags=self.flags())
655         l.transaction_start()
656         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
657         m["foo"] = [b"bar"]
658         m["objectUUID"] = b"0123456789abcdee"
659         l.add(m)
660         l.transaction_cancel()
661         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
662
663     def test_set_debug(self):
664         def my_report_fn(level, text):
665             pass
666         l = ldb.Ldb(self.url(), flags=self.flags())
667         l.set_debug(my_report_fn)
668
669     def test_zero_byte_string(self):
670         """Testing we do not get trapped in the \0 byte in a property string."""
671         l = ldb.Ldb(self.url(), flags=self.flags())
672         l.add({
673             "dn" : b"dc=somedn",
674             "objectclass" : b"user",
675             "cN" : b"LDAPtestUSER",
676             "givenname" : b"ldap",
677             "displayname" : b"foo\0bar",
678             "objectUUID" : b"0123456789abcdef"
679         })
680         res = l.search(expression="(dn=dc=somedn)")
681         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
682
683     def test_no_crash_broken_expr(self):
684         l = ldb.Ldb(self.url(), flags=self.flags())
685         self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
686
687 # Run the SimpleLdb tests against an lmdb backend
688 class SimpleLdbLmdb(SimpleLdb):
689
690     def setUp(self):
691         self.prefix = MDB_PREFIX
692         self.index = MDB_INDEX_OBJ
693         super(SimpleLdbLmdb, self).setUp()
694
695     def tearDown(self):
696         super(SimpleLdbLmdb, self).tearDown()
697
698 class SearchTests(LdbBaseTest):
699     def tearDown(self):
700         shutil.rmtree(self.testdir)
701         super(SearchTests, self).tearDown()
702
703         # Ensure the LDB is closed now, so we close the FD
704         del(self.l)
705
706
707     def setUp(self):
708         super(SearchTests, self).setUp()
709         self.testdir = tempdir()
710         self.filename = os.path.join(self.testdir, "search_test.ldb")
711         options = ["modules:rdn_name"]
712         if hasattr(self, 'IDXCHECK'):
713             options.append("disable_full_db_scan_for_self_test:1")
714         self.l = ldb.Ldb(self.url(),
715                          flags=self.flags(),
716                          options=options)
717         try:
718             self.l.add(self.index)
719         except AttributeError:
720             pass
721
722         self.l.add({"dn": "@ATTRIBUTES",
723                     "DC": "CASE_INSENSITIVE"})
724
725         # Note that we can't use the name objectGUID here, as we
726         # want to stay clear of the objectGUID handler in LDB and
727         # instead use just the 16 bytes raw, which we just keep
728         # to printable chars here for ease of handling.
729
730         self.l.add({"dn": "DC=SAMBA,DC=ORG",
731                     "name": b"samba.org",
732                     "objectUUID": b"0123456789abcdef"})
733         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
734                     "name": b"Admins",
735                     "x": "z", "y": "a",
736                     "objectUUID": b"0123456789abcde1"})
737         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
738                     "name": b"Users",
739                     "x": "z", "y": "a",
740                     "objectUUID": b"0123456789abcde2"})
741         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
742                     "name": b"OU #1",
743                     "x": "y", "y": "a",
744                     "objectUUID": b"0123456789abcde3"})
745         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
746                     "name": b"OU #2",
747                     "x": "y", "y": "a",
748                     "objectUUID": b"0123456789abcde4"})
749         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
750                     "name": b"OU #3",
751                     "x": "y", "y": "a",
752                     "objectUUID": b"0123456789abcde5"})
753         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
754                     "name": b"OU #4",
755                     "x": "y", "y": "a",
756                     "objectUUID": b"0123456789abcde6"})
757         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
758                     "name": b"OU #5",
759                     "x": "y", "y": "a",
760                     "objectUUID": b"0123456789abcde7"})
761         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
762                     "name": b"OU #6",
763                     "x": "y", "y": "a",
764                     "objectUUID": b"0123456789abcde8"})
765         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
766                     "name": b"OU #7",
767                     "x": "y", "y": "a",
768                     "objectUUID": b"0123456789abcde9"})
769         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
770                     "name": b"OU #8",
771                     "x": "y", "y": "a",
772                     "objectUUID": b"0123456789abcde0"})
773         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
774                     "name": b"OU #9",
775                     "x": "y", "y": "a",
776                     "objectUUID": b"0123456789abcdea"})
777         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
778                     "name": b"OU #10",
779                     "x": "y", "y": "a",
780                     "objectUUID": b"0123456789abcdeb"})
781         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
782                     "name": b"OU #10",
783                     "x": "y", "y": "a",
784                     "objectUUID": b"0123456789abcdec"})
785         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
786                     "name": b"OU #10",
787                     "x": "y", "y": "b",
788                     "objectUUID": b"0123456789abcded"})
789         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
790                     "name": b"OU #10",
791                     "x": "x", "y": "b",
792                     "objectUUID": b"0123456789abcdee"})
793         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
794                     "name": b"OU #10",
795                     "x": "x", "y": "b",
796                     "objectUUID": b"0123456789abcd01"})
797         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
798                     "name": b"OU #10",
799                     "x": "x", "y": "b",
800                     "objectUUID": b"0123456789abcd02"})
801         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
802                     "name": b"OU #10",
803                     "x": "x", "y": "b",
804                     "objectUUID": b"0123456789abcd03"})
805         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
806                     "name": b"OU #10",
807                     "x": "x", "y": "b",
808                     "objectUUID": b"0123456789abcd04"})
809         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
810                     "name": b"OU #10",
811                     "x": "x", "y": "b",
812                     "objectUUID": b"0123456789abcd05"})
813         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
814                     "name": b"OU #10",
815                     "x": "x", "y": "b",
816                     "objectUUID": b"0123456789abcd06"})
817         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
818                     "name": b"OU #10",
819                     "x": "x", "y": "b",
820                     "objectUUID": b"0123456789abcd07"})
821         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
822                     "name": b"OU #10",
823                     "x": "x", "y": "c",
824                     "objectUUID": b"0123456789abcd08"})
825         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
826                     "name": b"OU #10",
827                     "x": "x", "y": "c",
828                     "objectUUID": b"0123456789abcd09"})
829
830     def test_base(self):
831         """Testing a search"""
832
833         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
834                               scope=ldb.SCOPE_BASE)
835         self.assertEqual(len(res11), 1)
836
837     def test_base_lower(self):
838         """Testing a search"""
839
840         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
841                               scope=ldb.SCOPE_BASE)
842         self.assertEqual(len(res11), 1)
843
844     def test_base_or(self):
845         """Testing a search"""
846
847         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
848                               scope=ldb.SCOPE_BASE,
849                               expression="(|(ou=ou11)(ou=ou12))")
850         self.assertEqual(len(res11), 1)
851
852     def test_base_or2(self):
853         """Testing a search"""
854
855         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
856                               scope=ldb.SCOPE_BASE,
857                               expression="(|(x=y)(y=b))")
858         self.assertEqual(len(res11), 1)
859
860     def test_base_and(self):
861         """Testing a search"""
862
863         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
864                               scope=ldb.SCOPE_BASE,
865                               expression="(&(ou=ou11)(ou=ou12))")
866         self.assertEqual(len(res11), 0)
867
868     def test_base_and2(self):
869         """Testing a search"""
870
871         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
872                               scope=ldb.SCOPE_BASE,
873                               expression="(&(x=y)(y=a))")
874         self.assertEqual(len(res11), 1)
875
876     def test_base_false(self):
877         """Testing a search"""
878
879         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
880                               scope=ldb.SCOPE_BASE,
881                               expression="(|(ou=ou13)(ou=ou12))")
882         self.assertEqual(len(res11), 0)
883
884     def test_check_base_false(self):
885         """Testing a search"""
886         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
887                               scope=ldb.SCOPE_BASE,
888                               expression="(|(ou=ou13)(ou=ou12))")
889         self.assertEqual(len(res11), 0)
890
891     def test_check_base_error(self):
892         """Testing a search"""
893         checkbaseonsearch = {"dn": "@OPTIONS",
894                              "checkBaseOnSearch": b"TRUE"}
895         try:
896             self.l.add(checkbaseonsearch)
897         except ldb.LdbError as err:
898             enum = err.args[0]
899             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
900             m = ldb.Message.from_dict(self.l,
901                                       checkbaseonsearch)
902             self.l.modify(m)
903
904         try:
905             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
906                                   scope=ldb.SCOPE_BASE,
907                                   expression="(|(ou=ou13)(ou=ou12))")
908             self.fail("Should have failed on missing base")
909         except ldb.LdbError as err:
910             enum = err.args[0]
911             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
912
913     def test_subtree_and(self):
914         """Testing a search"""
915
916         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
917                               scope=ldb.SCOPE_SUBTREE,
918                               expression="(&(ou=ou11)(ou=ou12))")
919         self.assertEqual(len(res11), 0)
920
921     def test_subtree_and2(self):
922         """Testing a search"""
923
924         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
925                               scope=ldb.SCOPE_SUBTREE,
926                               expression="(&(x=y)(|(y=b)(y=c)))")
927         self.assertEqual(len(res11), 1)
928
929     def test_subtree_and2_lower(self):
930         """Testing a search"""
931
932         res11 = self.l.search(base="DC=samba,DC=org",
933                               scope=ldb.SCOPE_SUBTREE,
934                               expression="(&(x=y)(|(y=b)(y=c)))")
935         self.assertEqual(len(res11), 1)
936
937     def test_subtree_or(self):
938         """Testing a search"""
939
940         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
941                               scope=ldb.SCOPE_SUBTREE,
942                               expression="(|(ou=ou11)(ou=ou12))")
943         self.assertEqual(len(res11), 2)
944
945     def test_subtree_or2(self):
946         """Testing a search"""
947
948         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
949                               scope=ldb.SCOPE_SUBTREE,
950                               expression="(|(x=y)(y=b))")
951         self.assertEqual(len(res11), 20)
952
953     def test_subtree_or3(self):
954         """Testing a search"""
955
956         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
957                               scope=ldb.SCOPE_SUBTREE,
958                               expression="(|(x=y)(y=b)(y=c))")
959         self.assertEqual(len(res11), 22)
960
961     def test_one_and(self):
962         """Testing a search"""
963
964         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
965                               scope=ldb.SCOPE_ONELEVEL,
966                               expression="(&(ou=ou11)(ou=ou12))")
967         self.assertEqual(len(res11), 0)
968
969     def test_one_and2(self):
970         """Testing a search"""
971
972         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
973                               scope=ldb.SCOPE_ONELEVEL,
974                               expression="(&(x=y)(y=b))")
975         self.assertEqual(len(res11), 1)
976
977     def test_one_or(self):
978         """Testing a search"""
979
980         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
981                               scope=ldb.SCOPE_ONELEVEL,
982                               expression="(|(ou=ou11)(ou=ou12))")
983         self.assertEqual(len(res11), 2)
984
985     def test_one_or2(self):
986         """Testing a search"""
987
988         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
989                               scope=ldb.SCOPE_ONELEVEL,
990                               expression="(|(x=y)(y=b))")
991         self.assertEqual(len(res11), 20)
992
993     def test_one_or2_lower(self):
994         """Testing a search"""
995
996         res11 = self.l.search(base="DC=samba,DC=org",
997                               scope=ldb.SCOPE_ONELEVEL,
998                               expression="(|(x=y)(y=b))")
999         self.assertEqual(len(res11), 20)
1000
1001     def test_one_unindexable(self):
1002         """Testing a search"""
1003
1004         try:
1005             res11 = self.l.search(base="DC=samba,DC=org",
1006                                   scope=ldb.SCOPE_ONELEVEL,
1007                                   expression="(y=b*)")
1008             if hasattr(self, 'IDX') and \
1009                not hasattr(self, 'IDXONE') and \
1010                hasattr(self, 'IDXCHECK'):
1011                 self.fail("Should have failed as un-indexed search")
1012
1013             self.assertEqual(len(res11), 9)
1014
1015         except ldb.LdbError as err:
1016             enum = err.args[0]
1017             estr = err.args[1]
1018             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1019             self.assertIn(estr, "ldb FULL SEARCH disabled")
1020
1021     def test_one_unindexable_presence(self):
1022         """Testing a search"""
1023
1024         try:
1025             res11 = self.l.search(base="DC=samba,DC=org",
1026                                   scope=ldb.SCOPE_ONELEVEL,
1027                                   expression="(y=*)")
1028             if hasattr(self, 'IDX') and \
1029                not hasattr(self, 'IDXONE') and \
1030                hasattr(self, 'IDXCHECK'):
1031                 self.fail("Should have failed as un-indexed search")
1032
1033             self.assertEqual(len(res11), 24)
1034
1035         except ldb.LdbError as err:
1036             enum = err.args[0]
1037             estr = err.args[1]
1038             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1039             self.assertIn(estr, "ldb FULL SEARCH disabled")
1040
1041
1042     def test_subtree_and_or(self):
1043         """Testing a search"""
1044
1045         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1046                               scope=ldb.SCOPE_SUBTREE,
1047                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1048         self.assertEqual(len(res11), 0)
1049
1050     def test_subtree_and_or2(self):
1051         """Testing a search"""
1052
1053         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1054                               scope=ldb.SCOPE_SUBTREE,
1055                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1056         self.assertEqual(len(res11), 0)
1057
1058     def test_subtree_and_or3(self):
1059         """Testing a search"""
1060
1061         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1062                               scope=ldb.SCOPE_SUBTREE,
1063                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1064         self.assertEqual(len(res11), 2)
1065
1066     def test_subtree_and_or4(self):
1067         """Testing a search"""
1068
1069         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1070                               scope=ldb.SCOPE_SUBTREE,
1071                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1072         self.assertEqual(len(res11), 2)
1073
1074     def test_subtree_and_or5(self):
1075         """Testing a search"""
1076
1077         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1078                               scope=ldb.SCOPE_SUBTREE,
1079                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1080         self.assertEqual(len(res11), 1)
1081
1082     def test_subtree_or_and(self):
1083         """Testing a search"""
1084
1085         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1086                               scope=ldb.SCOPE_SUBTREE,
1087                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1088         self.assertEqual(len(res11), 10)
1089
1090     def test_subtree_large_and_unique(self):
1091         """Testing a search"""
1092
1093         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1094                               scope=ldb.SCOPE_SUBTREE,
1095                               expression="(&(ou=ou10)(y=a))")
1096         self.assertEqual(len(res11), 1)
1097
1098     def test_subtree_and_none(self):
1099         """Testing a search"""
1100
1101         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1102                               scope=ldb.SCOPE_SUBTREE,
1103                               expression="(&(ou=ouX)(y=a))")
1104         self.assertEqual(len(res11), 0)
1105
1106     def test_subtree_and_idx_record(self):
1107         """Testing a search against the index record"""
1108
1109         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1110                               scope=ldb.SCOPE_SUBTREE,
1111                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1112         self.assertEqual(len(res11), 0)
1113
1114     def test_subtree_and_idxone_record(self):
1115         """Testing a search against the index record"""
1116
1117         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1118                               scope=ldb.SCOPE_SUBTREE,
1119                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1120         self.assertEqual(len(res11), 0)
1121
1122     def test_subtree_unindexable(self):
1123         """Testing a search"""
1124
1125         try:
1126             res11 = self.l.search(base="DC=samba,DC=org",
1127                                   scope=ldb.SCOPE_SUBTREE,
1128                                   expression="(y=b*)")
1129             if hasattr(self, 'IDX') and \
1130                hasattr(self, 'IDXCHECK'):
1131                 self.fail("Should have failed as un-indexed search")
1132
1133             self.assertEqual(len(res11), 9)
1134
1135         except ldb.LdbError as err:
1136             enum = err.args[0]
1137             estr = err.args[1]
1138             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1139             self.assertIn(estr, "ldb FULL SEARCH disabled")
1140
1141     def test_subtree_unindexable_presence(self):
1142         """Testing a search"""
1143
1144         try:
1145             res11 = self.l.search(base="DC=samba,DC=org",
1146                                   scope=ldb.SCOPE_SUBTREE,
1147                                   expression="(y=*)")
1148             if hasattr(self, 'IDX') and \
1149                hasattr(self, 'IDXCHECK'):
1150                 self.fail("Should have failed as un-indexed search")
1151
1152             self.assertEqual(len(res11), 24)
1153
1154         except ldb.LdbError as err:
1155             enum = err.args[0]
1156             estr = err.args[1]
1157             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1158             self.assertIn(estr, "ldb FULL SEARCH disabled")
1159
1160
1161     def test_dn_filter_one(self):
1162         """Testing that a dn= filter succeeds
1163         (or fails with disallowDNFilter
1164         set and IDXGUID or (IDX and not IDXONE) mode)
1165         when the scope is SCOPE_ONELEVEL.
1166
1167         This should be made more consistent, but for now lock in
1168         the behaviour
1169
1170         """
1171
1172         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1173                               scope=ldb.SCOPE_ONELEVEL,
1174                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1175         if hasattr(self, 'disallowDNFilter') and \
1176            hasattr(self, 'IDX') and \
1177            (hasattr(self, 'IDXGUID') or \
1178             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1179             self.assertEqual(len(res11), 0)
1180         else:
1181             self.assertEqual(len(res11), 1)
1182
1183     def test_dn_filter_subtree(self):
1184         """Testing that a dn= filter succeeds
1185         (or fails with disallowDNFilter set)
1186         when the scope is SCOPE_SUBTREE"""
1187
1188         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1189                               scope=ldb.SCOPE_SUBTREE,
1190                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1191         if hasattr(self, 'disallowDNFilter') \
1192            and hasattr(self, 'IDX'):
1193             self.assertEqual(len(res11), 0)
1194         else:
1195             self.assertEqual(len(res11), 1)
1196
1197     def test_dn_filter_base(self):
1198         """Testing that (incorrectly) a dn= filter works
1199         when the scope is SCOPE_BASE"""
1200
1201         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1202                               scope=ldb.SCOPE_BASE,
1203                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1204
1205         # At some point we should fix this, but it isn't trivial
1206         self.assertEqual(len(res11), 1)
1207
1208     def test_distinguishedName_filter_one(self):
1209         """Testing that a distinguishedName= filter succeeds
1210         when the scope is SCOPE_ONELEVEL.
1211
1212         This should be made more consistent, but for now lock in
1213         the behaviour
1214
1215         """
1216
1217         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1218                               scope=ldb.SCOPE_ONELEVEL,
1219                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1220         self.assertEqual(len(res11), 1)
1221
1222     def test_distinguishedName_filter_subtree(self):
1223         """Testing that a distinguishedName= filter succeeds
1224         when the scope is SCOPE_SUBTREE"""
1225
1226         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1227                               scope=ldb.SCOPE_SUBTREE,
1228                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1229         self.assertEqual(len(res11), 1)
1230
1231     def test_distinguishedName_filter_base(self):
1232         """Testing that (incorrectly) a distinguishedName= filter works
1233         when the scope is SCOPE_BASE"""
1234
1235         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1236                               scope=ldb.SCOPE_BASE,
1237                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1238
1239         # At some point we should fix this, but it isn't trivial
1240         self.assertEqual(len(res11), 1)
1241
1242     def test_bad_dn_filter_base(self):
1243         """Testing that a dn= filter on an invalid DN works
1244         when the scope is SCOPE_BASE but
1245         returns zero results"""
1246
1247         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1248                               scope=ldb.SCOPE_BASE,
1249                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1250
1251         # At some point we should fix this, but it isn't trivial
1252         self.assertEqual(len(res11), 0)
1253
1254
1255     def test_bad_dn_filter_one(self):
1256         """Testing that a dn= filter succeeds but returns zero
1257         results when the DN is not valid on a SCOPE_ONELEVEL search
1258
1259         """
1260
1261         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1262                               scope=ldb.SCOPE_ONELEVEL,
1263                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1264         self.assertEqual(len(res11), 0)
1265
1266     def test_bad_dn_filter_subtree(self):
1267         """Testing that a dn= filter succeeds but returns zero
1268         results when the DN is not valid on a SCOPE_SUBTREE search
1269
1270         """
1271
1272         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1273                               scope=ldb.SCOPE_SUBTREE,
1274                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1275         self.assertEqual(len(res11), 0)
1276
1277     def test_bad_distinguishedName_filter_base(self):
1278         """Testing that a distinguishedName= filter on an invalid DN works
1279         when the scope is SCOPE_BASE but
1280         returns zero results"""
1281
1282         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1283                               scope=ldb.SCOPE_BASE,
1284                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1285
1286         # At some point we should fix this, but it isn't trivial
1287         self.assertEqual(len(res11), 0)
1288
1289
1290     def test_bad_distinguishedName_filter_one(self):
1291         """Testing that a distinguishedName= filter succeeds but returns zero
1292         results when the DN is not valid on a SCOPE_ONELEVEL search
1293
1294         """
1295
1296         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1297                               scope=ldb.SCOPE_ONELEVEL,
1298                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1299         self.assertEqual(len(res11), 0)
1300
1301     def test_bad_distinguishedName_filter_subtree(self):
1302         """Testing that a distinguishedName= filter succeeds but returns zero
1303         results when the DN is not valid on a SCOPE_SUBTREE search
1304
1305         """
1306
1307         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1308                               scope=ldb.SCOPE_SUBTREE,
1309                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1310         self.assertEqual(len(res11), 0)
1311
1312     def test_bad_dn_search_base(self):
1313         """Testing with a bad base DN (SCOPE_BASE)"""
1314
1315         try:
1316             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1317                                   scope=ldb.SCOPE_BASE)
1318             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1319         except ldb.LdbError as err:
1320             enum = err.args[0]
1321             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1322
1323
1324     def test_bad_dn_search_one(self):
1325         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1326
1327         try:
1328             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1329                               scope=ldb.SCOPE_ONELEVEL)
1330             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1331         except ldb.LdbError as err:
1332             enum = err.args[0]
1333             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1334
1335     def test_bad_dn_search_subtree(self):
1336         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1337
1338         try:
1339             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1340                                   scope=ldb.SCOPE_SUBTREE)
1341             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1342         except ldb.LdbError as err:
1343             enum = err.args[0]
1344             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1345
1346
1347
1348 # Run the search tests against an lmdb backend
1349 class SearchTestsLmdb(SearchTests):
1350
1351     def setUp(self):
1352         self.prefix = MDB_PREFIX
1353         self.index = MDB_INDEX_OBJ
1354         super(SearchTestsLmdb, self).setUp()
1355
1356     def tearDown(self):
1357         super(SearchTestsLmdb, self).tearDown()
1358
1359
1360 class IndexedSearchTests(SearchTests):
1361     """Test searches using the index, to ensure the index doesn't
1362        break things"""
1363     def setUp(self):
1364         super(IndexedSearchTests, self).setUp()
1365         self.l.add({"dn": "@INDEXLIST",
1366                     "@IDXATTR": [b"x", b"y", b"ou"]})
1367         self.IDX = True
1368
1369 class IndexedCheckSearchTests(IndexedSearchTests):
1370     """Test searches using the index, to ensure the index doesn't
1371        break things (full scan disabled)"""
1372     def setUp(self):
1373         self.IDXCHECK = True
1374         super(IndexedCheckSearchTests, self).setUp()
1375
1376 class IndexedSearchDnFilterTests(SearchTests):
1377     """Test searches using the index, to ensure the index doesn't
1378        break things"""
1379     def setUp(self):
1380         super(IndexedSearchDnFilterTests, self).setUp()
1381         self.l.add({"dn": "@OPTIONS",
1382                     "disallowDNFilter": "TRUE"})
1383         self.disallowDNFilter = True
1384
1385         self.l.add({"dn": "@INDEXLIST",
1386                     "@IDXATTR": [b"x", b"y", b"ou"]})
1387         self.IDX = True
1388
1389 class IndexedAndOneLevelSearchTests(SearchTests):
1390     """Test searches using the index including @IDXONE, to ensure
1391        the index doesn't break things"""
1392     def setUp(self):
1393         super(IndexedAndOneLevelSearchTests, self).setUp()
1394         self.l.add({"dn": "@INDEXLIST",
1395                     "@IDXATTR": [b"x", b"y", b"ou"],
1396                     "@IDXONE": [b"1"]})
1397         self.IDX = True
1398         self.IDXONE = True
1399
1400 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1401     """Test searches using the index including @IDXONE, to ensure
1402        the index doesn't break things (full scan disabled)"""
1403     def setUp(self):
1404         self.IDXCHECK = True
1405         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1406
1407 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1408     """Test searches using the index including @IDXONE, to ensure
1409        the index doesn't break things"""
1410     def setUp(self):
1411         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1412         self.l.add({"dn": "@OPTIONS",
1413                     "disallowDNFilter": "TRUE",
1414                     "checkBaseOnSearch": "TRUE"})
1415         self.disallowDNFilter = True
1416         self.checkBaseOnSearch = True
1417
1418         self.l.add({"dn": "@INDEXLIST",
1419                     "@IDXATTR": [b"x", b"y", b"ou"],
1420                     "@IDXONE": [b"1"]})
1421         self.IDX = True
1422         self.IDXONE = True
1423
1424 class GUIDIndexedSearchTests(SearchTests):
1425     """Test searches using the index, to ensure the index doesn't
1426        break things"""
1427     def setUp(self):
1428         self.index = {"dn": "@INDEXLIST",
1429                       "@IDXATTR": [b"x", b"y", b"ou"],
1430                       "@IDXGUID": [b"objectUUID"],
1431                       "@IDX_DN_GUID": [b"GUID"]}
1432         super(GUIDIndexedSearchTests, self).setUp()
1433
1434         self.IDXGUID = True
1435         self.IDXONE = True
1436
1437
1438 class GUIDIndexedDNFilterSearchTests(SearchTests):
1439     """Test searches using the index, to ensure the index doesn't
1440        break things"""
1441     def setUp(self):
1442         self.index = {"dn": "@INDEXLIST",
1443                       "@IDXATTR": [b"x", b"y", b"ou"],
1444                       "@IDXGUID": [b"objectUUID"],
1445                       "@IDX_DN_GUID": [b"GUID"]}
1446         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1447         self.l.add({"dn": "@OPTIONS",
1448                     "disallowDNFilter": "TRUE",
1449                     "checkBaseOnSearch": "TRUE"})
1450         self.disallowDNFilter = True
1451         self.checkBaseOnSearch = True
1452         self.IDX = True
1453         self.IDXGUID = True
1454
1455 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1456     """Test searches using the index including @IDXONE, to ensure
1457        the index doesn't break things"""
1458     def setUp(self):
1459         self.index = {"dn": "@INDEXLIST",
1460                       "@IDXATTR": [b"x", b"y", b"ou"],
1461                       "@IDXGUID": [b"objectUUID"],
1462                       "@IDX_DN_GUID": [b"GUID"]}
1463         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1464         self.l.add({"dn": "@OPTIONS",
1465                     "disallowDNFilter": "TRUE",
1466                     "checkBaseOnSearch": "TRUE"})
1467         self.disallowDNFilter = True
1468         self.checkBaseOnSearch = True
1469         self.IDX = True
1470         self.IDXGUID = True
1471         self.IDXONE = True
1472
1473 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1474
1475     def setUp(self):
1476         self.prefix = MDB_PREFIX
1477         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1478
1479     def tearDown(self):
1480         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1481
1482
1483 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1484
1485     def setUp(self):
1486         self.prefix = MDB_PREFIX
1487         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1488
1489     def tearDown(self):
1490         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1491
1492
1493 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1494
1495     def setUp(self):
1496         self.prefix = MDB_PREFIX
1497         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1498
1499     def tearDown(self):
1500         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1501
1502
1503 class AddModifyTests(LdbBaseTest):
1504     def tearDown(self):
1505         shutil.rmtree(self.testdir)
1506         super(AddModifyTests, self).tearDown()
1507
1508         # Ensure the LDB is closed now, so we close the FD
1509         del(self.l)
1510
1511     def setUp(self):
1512         super(AddModifyTests, self).setUp()
1513         self.testdir = tempdir()
1514         self.filename = os.path.join(self.testdir, "add_test.ldb")
1515         self.l = ldb.Ldb(self.url(),
1516                          flags=self.flags(),
1517                          options=["modules:rdn_name"])
1518         try:
1519             self.l.add(self.index)
1520         except AttributeError:
1521             pass
1522
1523         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1524                     "name": b"samba.org",
1525                     "objectUUID": b"0123456789abcdef"})
1526         self.l.add({"dn": "@ATTRIBUTES",
1527                     "objectUUID": "UNIQUE_INDEX"})
1528
1529     def test_add_dup(self):
1530         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1531                     "name": b"Admins",
1532                     "x": "z", "y": "a",
1533                     "objectUUID": b"0123456789abcde1"})
1534         try:
1535             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1536                         "name": b"Admins",
1537                         "x": "z", "y": "a",
1538                         "objectUUID": b"0123456789abcde2"})
1539             self.fail("Should have failed adding dupliate entry")
1540         except ldb.LdbError as err:
1541             enum = err.args[0]
1542             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1543
1544     def test_add_bad(self):
1545         try:
1546             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1547                         "name": b"Admins",
1548                         "x": "z", "y": "a",
1549                         "objectUUID": b"0123456789abcde1"})
1550             self.fail("Should have failed adding entry with invalid DN")
1551         except ldb.LdbError as err:
1552             enum = err.args[0]
1553             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1554
1555     def test_add_del_add(self):
1556         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1557                     "name": b"Admins",
1558                     "x": "z", "y": "a",
1559                     "objectUUID": b"0123456789abcde1"})
1560         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1561         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1562                     "name": b"Admins",
1563                     "x": "z", "y": "a",
1564                     "objectUUID": b"0123456789abcde2"})
1565
1566     def test_add_move_add(self):
1567         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1568                     "name": b"Admins",
1569                     "x": "z", "y": "a",
1570                     "objectUUID": b"0123456789abcde1"})
1571         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1572                       "OU=DUP2,DC=SAMBA,DC=ORG")
1573         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1574                     "name": b"Admins",
1575                     "x": "z", "y": "a",
1576                     "objectUUID": b"0123456789abcde2"})
1577
1578     def test_add_move_fail_move_move(self):
1579         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1580                     "name": b"Admins",
1581                     "x": "z", "y": "a",
1582                     "objectUUID": b"0123456789abcde1"})
1583         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1584                     "name": b"Admins",
1585                     "x": "z", "y": "a",
1586                     "objectUUID": b"0123456789abcde2"})
1587
1588         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1589                              scope=ldb.SCOPE_SUBTREE,
1590                              expression="(objectUUID=0123456789abcde1)")
1591         self.assertEqual(len(res2), 1)
1592         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1593
1594         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1595                              scope=ldb.SCOPE_SUBTREE,
1596                              expression="(objectUUID=0123456789abcde2)")
1597         self.assertEqual(len(res3), 1)
1598         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1599
1600         try:
1601             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1602                           "OU=DUP2,DC=SAMBA,DC=ORG")
1603             self.fail("Should have failed on duplicate DN")
1604         except ldb.LdbError as err:
1605             enum = err.args[0]
1606             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1607
1608         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1609                       "OU=DUP3,DC=SAMBA,DC=ORG")
1610
1611         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1612                       "OU=DUP2,DC=SAMBA,DC=ORG")
1613
1614         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1615                              scope=ldb.SCOPE_SUBTREE,
1616                              expression="(objectUUID=0123456789abcde1)")
1617         self.assertEqual(len(res2), 1)
1618         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1619
1620         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1621                              scope=ldb.SCOPE_SUBTREE,
1622                              expression="(objectUUID=0123456789abcde2)")
1623         self.assertEqual(len(res3), 1)
1624         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1625
1626     def test_move_missing(self):
1627         try:
1628             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1629                           "OU=DUP2,DC=SAMBA,DC=ORG")
1630             self.fail("Should have failed on missing")
1631         except ldb.LdbError as err:
1632             enum = err.args[0]
1633             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1634
1635     def test_move_missing2(self):
1636         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1637                     "name": b"Admins",
1638                     "x": "z", "y": "a",
1639                     "objectUUID": b"0123456789abcde2"})
1640
1641         try:
1642             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1643                           "OU=DUP2,DC=SAMBA,DC=ORG")
1644             self.fail("Should have failed on missing")
1645         except ldb.LdbError as err:
1646             enum = err.args[0]
1647             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1648
1649     def test_move_bad(self):
1650         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1651                     "name": b"Admins",
1652                     "x": "z", "y": "a",
1653                     "objectUUID": b"0123456789abcde2"})
1654
1655         try:
1656             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1657                           "OU=DUP2,DC=SAMBA,DC=ORG")
1658             self.fail("Should have failed on invalid DN")
1659         except ldb.LdbError as err:
1660             enum = err.args[0]
1661             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1662
1663     def test_move_bad2(self):
1664         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1665                     "name": b"Admins",
1666                     "x": "z", "y": "a",
1667                     "objectUUID": b"0123456789abcde2"})
1668
1669         try:
1670             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1671                           "OUXDUP2,DC=SAMBA,DC=ORG")
1672             self.fail("Should have failed on missing")
1673         except ldb.LdbError as err:
1674             enum = err.args[0]
1675             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1676
1677     def test_move_fail_move_add(self):
1678         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1679                     "name": b"Admins",
1680                     "x": "z", "y": "a",
1681                     "objectUUID": b"0123456789abcde1"})
1682         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1683                     "name": b"Admins",
1684                     "x": "z", "y": "a",
1685                     "objectUUID": b"0123456789abcde2"})
1686         try:
1687             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1688                           "OU=DUP2,DC=SAMBA,DC=ORG")
1689             self.fail("Should have failed on duplicate DN")
1690         except ldb.LdbError as err:
1691             enum = err.args[0]
1692             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1693
1694         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1695                       "OU=DUP3,DC=SAMBA,DC=ORG")
1696
1697         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1698                     "name": b"Admins",
1699                     "x": "z", "y": "a",
1700                     "objectUUID": b"0123456789abcde3"})
1701
1702
1703 class AddModifyTestsLmdb(AddModifyTests):
1704
1705     def setUp(self):
1706         self.prefix = MDB_PREFIX
1707         self.index = MDB_INDEX_OBJ
1708         super(AddModifyTestsLmdb, self).setUp()
1709
1710     def tearDown(self):
1711         super(AddModifyTestsLmdb, self).tearDown()
1712
1713 class IndexedAddModifyTests(AddModifyTests):
1714     """Test searches using the index, to ensure the index doesn't
1715        break things"""
1716     def setUp(self):
1717         if not hasattr(self, 'index'):
1718             self.index = {"dn": "@INDEXLIST",
1719                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1720                           "@IDXONE": [b"1"]}
1721         super(IndexedAddModifyTests, self).setUp()
1722
1723     def test_duplicate_GUID(self):
1724         try:
1725             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1726                         "name": b"Admins",
1727                         "x": "z", "y": "a",
1728                         "objectUUID": b"0123456789abcdef"})
1729             self.fail("Should have failed adding dupliate GUID")
1730         except ldb.LdbError as err:
1731             enum = err.args[0]
1732             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1733
1734     def test_duplicate_name_dup_GUID(self):
1735         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1736                     "name": b"Admins",
1737                     "x": "z", "y": "a",
1738                     "objectUUID": b"a123456789abcdef"})
1739         try:
1740             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1741                         "name": b"Admins",
1742                         "x": "z", "y": "a",
1743                         "objectUUID": b"a123456789abcdef"})
1744             self.fail("Should have failed adding dupliate GUID")
1745         except ldb.LdbError as err:
1746             enum = err.args[0]
1747             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1748
1749     def test_duplicate_name_dup_GUID2(self):
1750         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1751                     "name": b"Admins",
1752                     "x": "z", "y": "a",
1753                     "objectUUID": b"abc3456789abcdef"})
1754         try:
1755             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1756                         "name": b"Admins",
1757                         "x": "z", "y": "a",
1758                         "objectUUID": b"aaa3456789abcdef"})
1759             self.fail("Should have failed adding dupliate DN")
1760         except ldb.LdbError as err:
1761             enum = err.args[0]
1762             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1763
1764         # Checking the GUID didn't stick in the index
1765         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1766                     "name": b"Admins",
1767                     "x": "z", "y": "a",
1768                     "objectUUID": b"aaa3456789abcdef"})
1769
1770     def test_add_dup_guid_add(self):
1771         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1772                     "name": b"Admins",
1773                     "x": "z", "y": "a",
1774                     "objectUUID": b"0123456789abcde1"})
1775         try:
1776             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1777                         "name": b"Admins",
1778                         "x": "z", "y": "a",
1779                         "objectUUID": b"0123456789abcde1"})
1780             self.fail("Should have failed on duplicate GUID")
1781
1782         except ldb.LdbError as err:
1783             enum = err.args[0]
1784             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1785
1786         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1787                     "name": b"Admins",
1788                     "x": "z", "y": "a",
1789                     "objectUUID": b"0123456789abcde2"})
1790
1791 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1792     """Test searches using the index, to ensure the index doesn't
1793        break things"""
1794     def setUp(self):
1795         self.index = {"dn": "@INDEXLIST",
1796                       "@IDXATTR": [b"x", b"y", b"ou"],
1797                       "@IDXONE": [b"1"],
1798                       "@IDXGUID": [b"objectUUID"],
1799                       "@IDX_DN_GUID": [b"GUID"]}
1800         super(GUIDIndexedAddModifyTests, self).setUp()
1801
1802
1803 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1804     """Test GUID index behaviour insdie the transaction"""
1805     def setUp(self):
1806         super(GUIDTransIndexedAddModifyTests, self).setUp()
1807         self.l.transaction_start()
1808
1809     def tearDown(self):
1810         self.l.transaction_commit()
1811         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1812
1813 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1814     """Test index behaviour insdie the transaction"""
1815     def setUp(self):
1816         super(TransIndexedAddModifyTests, self).setUp()
1817         self.l.transaction_start()
1818
1819     def tearDown(self):
1820         self.l.transaction_commit()
1821         super(TransIndexedAddModifyTests, self).tearDown()
1822
1823 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1824
1825     def setUp(self):
1826         self.prefix = MDB_PREFIX
1827         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1828
1829     def tearDown(self):
1830         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1831
1832 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1833
1834     def setUp(self):
1835         self.prefix = MDB_PREFIX
1836         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1837
1838     def tearDown(self):
1839         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1840
1841 class BadIndexTests(LdbBaseTest):
1842     def setUp(self):
1843         super(BadIndexTests, self).setUp()
1844         self.testdir = tempdir()
1845         self.filename = os.path.join(self.testdir, "test.ldb")
1846         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1847         if hasattr(self, 'IDXGUID'):
1848             self.ldb.add({"dn": "@INDEXLIST",
1849                           "@IDXATTR": [b"x", b"y", b"ou"],
1850                           "@IDXGUID": [b"objectUUID"],
1851                           "@IDX_DN_GUID": [b"GUID"]})
1852         else:
1853             self.ldb.add({"dn": "@INDEXLIST",
1854                           "@IDXATTR": [b"x", b"y", b"ou"]})
1855
1856         super(BadIndexTests, self).setUp()
1857
1858     def test_unique(self):
1859         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1860                       "objectUUID": b"0123456789abcde1",
1861                       "y": "1"})
1862         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1863                       "objectUUID": b"0123456789abcde2",
1864                       "y": "1"})
1865         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1866                       "objectUUID": b"0123456789abcde3",
1867                       "y": "1"})
1868
1869         res = self.ldb.search(expression="(y=1)",
1870                               base="dc=samba,dc=org")
1871         self.assertEquals(len(res), 3)
1872
1873         # Now set this to unique index, but forget to check the result
1874         try:
1875             self.ldb.add({"dn": "@ATTRIBUTES",
1876                         "y": "UNIQUE_INDEX"})
1877             self.fail()
1878         except ldb.LdbError:
1879             pass
1880
1881         # We must still have a working index
1882         res = self.ldb.search(expression="(y=1)",
1883                               base="dc=samba,dc=org")
1884         self.assertEquals(len(res), 3)
1885
1886     def test_unique_transaction(self):
1887         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1888                       "objectUUID": b"0123456789abcde1",
1889                       "y": "1"})
1890         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1891                       "objectUUID": b"0123456789abcde2",
1892                       "y": "1"})
1893         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1894                       "objectUUID": b"0123456789abcde3",
1895                       "y": "1"})
1896
1897         res = self.ldb.search(expression="(y=1)",
1898                               base="dc=samba,dc=org")
1899         self.assertEquals(len(res), 3)
1900
1901         self.ldb.transaction_start()
1902
1903         # Now set this to unique index, but forget to check the result
1904         try:
1905             self.ldb.add({"dn": "@ATTRIBUTES",
1906                         "y": "UNIQUE_INDEX"})
1907         except ldb.LdbError:
1908             pass
1909
1910         try:
1911             self.ldb.transaction_commit()
1912             self.fail()
1913
1914         except ldb.LdbError as err:
1915             enum = err.args[0]
1916             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1917
1918         # We must still have a working index
1919         res = self.ldb.search(expression="(y=1)",
1920                               base="dc=samba,dc=org")
1921
1922         self.assertEquals(len(res), 3)
1923
1924     def test_casefold(self):
1925         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1926                       "objectUUID": b"0123456789abcde1",
1927                       "y": "a"})
1928         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1929                       "objectUUID": b"0123456789abcde2",
1930                       "y": "A"})
1931         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1932                       "objectUUID": b"0123456789abcde3",
1933                       "y": ["a", "A"]})
1934
1935         res = self.ldb.search(expression="(y=a)",
1936                               base="dc=samba,dc=org")
1937         self.assertEquals(len(res), 2)
1938
1939         self.ldb.add({"dn": "@ATTRIBUTES",
1940                       "y": "CASE_INSENSITIVE"})
1941
1942         # We must still have a working index
1943         res = self.ldb.search(expression="(y=a)",
1944                               base="dc=samba,dc=org")
1945
1946         if hasattr(self, 'IDXGUID'):
1947             self.assertEquals(len(res), 3)
1948         else:
1949             # We should not return this entry twice, but sadly
1950             # we have not yet fixed
1951             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1952             self.assertEquals(len(res), 4)
1953
1954     def test_casefold_transaction(self):
1955         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1956                       "objectUUID": b"0123456789abcde1",
1957                       "y": "a"})
1958         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1959                       "objectUUID": b"0123456789abcde2",
1960                       "y": "A"})
1961         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1962                       "objectUUID": b"0123456789abcde3",
1963                       "y": ["a", "A"]})
1964
1965         res = self.ldb.search(expression="(y=a)",
1966                               base="dc=samba,dc=org")
1967         self.assertEquals(len(res), 2)
1968
1969         self.ldb.transaction_start()
1970
1971         self.ldb.add({"dn": "@ATTRIBUTES",
1972                       "y": "CASE_INSENSITIVE"})
1973
1974         self.ldb.transaction_commit()
1975
1976         # We must still have a working index
1977         res = self.ldb.search(expression="(y=a)",
1978                               base="dc=samba,dc=org")
1979
1980         if hasattr(self, 'IDXGUID'):
1981             self.assertEquals(len(res), 3)
1982         else:
1983             # We should not return this entry twice, but sadly
1984             # we have not yet fixed
1985             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1986             self.assertEquals(len(res), 4)
1987
1988
1989     def tearDown(self):
1990         super(BadIndexTests, self).tearDown()
1991
1992
1993 class GUIDBadIndexTests(BadIndexTests):
1994     """Test Bad index things with GUID index mode"""
1995     def setUp(self):
1996         self.IDXGUID = True
1997
1998         super(GUIDBadIndexTests, self).setUp()
1999
2000 class DnTests(TestCase):
2001
2002     def setUp(self):
2003         super(DnTests, self).setUp()
2004         self.ldb = ldb.Ldb()
2005
2006     def tearDown(self):
2007         super(DnTests, self).tearDown()
2008         del(self.ldb)
2009
2010     def test_set_dn_invalid(self):
2011         x = ldb.Message()
2012         def assign():
2013             x.dn = "astring"
2014         self.assertRaises(TypeError, assign)
2015
2016     def test_eq(self):
2017         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2018         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2019         self.assertEqual(x, y)
2020         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2021         self.assertNotEqual(x, y)
2022
2023     def test_str(self):
2024         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2025         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2026
2027     def test_repr(self):
2028         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2029         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2030
2031     def test_get_casefold(self):
2032         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2033         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2034
2035     def test_validate(self):
2036         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2037         self.assertTrue(x.validate())
2038
2039     def test_parent(self):
2040         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2041         self.assertEqual("bar=bloe", x.parent().__str__())
2042
2043     def test_parent_nonexistent(self):
2044         x = ldb.Dn(self.ldb, "@BLA")
2045         self.assertEqual(None, x.parent())
2046
2047     def test_is_valid(self):
2048         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2049         self.assertTrue(x.is_valid())
2050         x = ldb.Dn(self.ldb, "")
2051         self.assertTrue(x.is_valid())
2052
2053     def test_is_special(self):
2054         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2055         self.assertFalse(x.is_special())
2056         x = ldb.Dn(self.ldb, "@FOOBAR")
2057         self.assertTrue(x.is_special())
2058
2059     def test_check_special(self):
2060         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2061         self.assertFalse(x.check_special("FOOBAR"))
2062         x = ldb.Dn(self.ldb, "@FOOBAR")
2063         self.assertTrue(x.check_special("@FOOBAR"))
2064
2065     def test_len(self):
2066         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2067         self.assertEqual(2, len(x))
2068         x = ldb.Dn(self.ldb, "dc=foo21")
2069         self.assertEqual(1, len(x))
2070
2071     def test_add_child(self):
2072         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2073         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2074         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2075
2076     def test_add_base(self):
2077         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2078         base = ldb.Dn(self.ldb, "bla=bloe")
2079         self.assertTrue(x.add_base(base))
2080         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2081
2082     def test_add_child_str(self):
2083         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2084         self.assertTrue(x.add_child("bla=bloe"))
2085         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2086
2087     def test_add_base_str(self):
2088         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2089         base = "bla=bloe"
2090         self.assertTrue(x.add_base(base))
2091         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2092
2093     def test_add(self):
2094         x = ldb.Dn(self.ldb, "dc=foo24")
2095         y = ldb.Dn(self.ldb, "bar=bla")
2096         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2097
2098     def test_remove_base_components(self):
2099         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2100         x.remove_base_components(len(x)-1)
2101         self.assertEqual("dc=foo24", str(x))
2102
2103     def test_parse_ldif(self):
2104         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2105         msg = next(msgs)
2106         self.assertEqual("foo=bar", str(msg[1].dn))
2107         self.assertTrue(isinstance(msg[1], ldb.Message))
2108         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2109         self.assertEqual("dn: foo=bar\n\n", ldif)
2110
2111     def test_parse_ldif_more(self):
2112         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2113         msg = next(msgs)
2114         self.assertEqual("foo=bar", str(msg[1].dn))
2115         msg = next(msgs)
2116         self.assertEqual("bar=bar", str(msg[1].dn))
2117
2118     def test_canonical_string(self):
2119         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2120         self.assertEqual("/bloe/foo25", x.canonical_str())
2121
2122     def test_canonical_ex_string(self):
2123         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2124         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2125
2126     def test_ldb_is_child_of(self):
2127         """Testing ldb_dn_compare_dn"""
2128         dn1 = ldb.Dn(self.ldb, "dc=base")
2129         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2130         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2131         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2132
2133         self.assertTrue(dn1.is_child_of(dn1))
2134         self.assertTrue(dn2.is_child_of(dn1))
2135         self.assertTrue(dn4.is_child_of(dn1))
2136         self.assertTrue(dn4.is_child_of(dn3))
2137         self.assertTrue(dn4.is_child_of(dn4))
2138         self.assertFalse(dn3.is_child_of(dn2))
2139         self.assertFalse(dn1.is_child_of(dn4))
2140
2141     def test_ldb_is_child_of_str(self):
2142         """Testing ldb_dn_compare_dn"""
2143         dn1_str = "dc=base"
2144         dn2_str = "cn=foo,dc=base"
2145         dn3_str = "cn=bar,dc=base"
2146         dn4_str = "cn=baz,cn=bar,dc=base"
2147
2148         dn1 = ldb.Dn(self.ldb, dn1_str)
2149         dn2 = ldb.Dn(self.ldb, dn2_str)
2150         dn3 = ldb.Dn(self.ldb, dn3_str)
2151         dn4 = ldb.Dn(self.ldb, dn4_str)
2152
2153         self.assertTrue(dn1.is_child_of(dn1_str))
2154         self.assertTrue(dn2.is_child_of(dn1_str))
2155         self.assertTrue(dn4.is_child_of(dn1_str))
2156         self.assertTrue(dn4.is_child_of(dn3_str))
2157         self.assertTrue(dn4.is_child_of(dn4_str))
2158         self.assertFalse(dn3.is_child_of(dn2_str))
2159         self.assertFalse(dn1.is_child_of(dn4_str))
2160
2161     def test_get_component_name(self):
2162         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2163         self.assertEqual(dn.get_component_name(0), 'cn')
2164         self.assertEqual(dn.get_component_name(1), 'dc')
2165         self.assertEqual(dn.get_component_name(2), None)
2166         self.assertEqual(dn.get_component_name(-1), None)
2167
2168     def test_get_component_value(self):
2169         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2170         self.assertEqual(dn.get_component_value(0), 'foo')
2171         self.assertEqual(dn.get_component_value(1), 'base')
2172         self.assertEqual(dn.get_component_name(2), None)
2173         self.assertEqual(dn.get_component_name(-1), None)
2174
2175     def test_set_component(self):
2176         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2177         dn.set_component(0, 'cn', 'bar')
2178         self.assertEqual(str(dn), "cn=bar,dc=base")
2179         dn.set_component(1, 'o', 'asep')
2180         self.assertEqual(str(dn), "cn=bar,o=asep")
2181         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2182         self.assertEqual(str(dn), "cn=bar,o=asep")
2183         dn.set_component(1, 'o', 'a,b+c')
2184         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2185
2186     def test_set_component_bytes(self):
2187         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2188         dn.set_component(0, 'cn', b'bar')
2189         self.assertEqual(str(dn), "cn=bar,dc=base")
2190         dn.set_component(1, 'o', b'asep')
2191         self.assertEqual(str(dn), "cn=bar,o=asep")
2192
2193     def test_set_component_none(self):
2194         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2195         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2196
2197     def test_get_extended_component_null(self):
2198         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2199         self.assertEqual(dn.get_extended_component("TEST"), None)
2200
2201     def test_get_extended_component(self):
2202         self.ldb._register_test_extensions()
2203         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2204         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2205
2206     def test_set_extended_component(self):
2207         self.ldb._register_test_extensions()
2208         dn = ldb.Dn(self.ldb, "dc=base")
2209         dn.set_extended_component("TEST", "foo")
2210         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2211         dn.set_extended_component("TEST", b"bar")
2212         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2213
2214     def test_extended_str(self):
2215         self.ldb._register_test_extensions()
2216         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2217         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2218
2219     def test_get_rdn_name(self):
2220         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2221         self.assertEqual(dn.get_rdn_name(), 'cn')
2222
2223     def test_get_rdn_value(self):
2224         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2225         self.assertEqual(dn.get_rdn_value(), 'foo')
2226
2227     def test_get_casefold(self):
2228         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2229         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2230
2231     def test_get_linearized(self):
2232         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2233         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2234
2235     def test_is_null(self):
2236         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2237         self.assertFalse(dn.is_null())
2238
2239         dn = ldb.Dn(self.ldb, '')
2240         self.assertTrue(dn.is_null())
2241
2242 class LdbMsgTests(TestCase):
2243
2244     def setUp(self):
2245         super(LdbMsgTests, self).setUp()
2246         self.msg = ldb.Message()
2247
2248     def test_init_dn(self):
2249         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2250         self.assertEqual("dc=foo27", str(self.msg.dn))
2251
2252     def test_iter_items(self):
2253         self.assertEqual(0, len(self.msg.items()))
2254         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2255         self.assertEqual(1, len(self.msg.items()))
2256
2257     def test_repr(self):
2258         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2259         self.msg["dc"] = b"foo"
2260         if PY3:
2261             self.assertIn(repr(self.msg), [
2262                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2263                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2264             ])
2265             self.assertIn(repr(self.msg.text), [
2266                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2267                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2268             ])
2269         else:
2270             self.assertEqual(
2271                 repr(self.msg),
2272                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
2273             self.assertEqual(
2274                 repr(self.msg.text),
2275                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
2276
2277     def test_len(self):
2278         self.assertEqual(0, len(self.msg))
2279
2280     def test_notpresent(self):
2281         self.assertRaises(KeyError, lambda: self.msg["foo"])
2282
2283     def test_del(self):
2284         del self.msg["foo"]
2285
2286     def test_add(self):
2287         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2288
2289     def test_add_text(self):
2290         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2291
2292     def test_elements_empty(self):
2293         self.assertEqual([], self.msg.elements())
2294
2295     def test_elements(self):
2296         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2297         self.msg.add(el)
2298         self.assertEqual([el], self.msg.elements())
2299         self.assertEqual([el.text], self.msg.text.elements())
2300
2301     def test_add_value(self):
2302         self.assertEqual(0, len(self.msg))
2303         self.msg["foo"] = [b"foo"]
2304         self.assertEqual(1, len(self.msg))
2305
2306     def test_add_value_text(self):
2307         self.assertEqual(0, len(self.msg))
2308         self.msg["foo"] = ["foo"]
2309         self.assertEqual(1, len(self.msg))
2310
2311     def test_add_value_multiple(self):
2312         self.assertEqual(0, len(self.msg))
2313         self.msg["foo"] = [b"foo", b"bla"]
2314         self.assertEqual(1, len(self.msg))
2315         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2316
2317     def test_add_value_multiple_text(self):
2318         self.assertEqual(0, len(self.msg))
2319         self.msg["foo"] = ["foo", "bla"]
2320         self.assertEqual(1, len(self.msg))
2321         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2322
2323     def test_set_value(self):
2324         self.msg["foo"] = [b"fool"]
2325         self.assertEqual([b"fool"], list(self.msg["foo"]))
2326         self.msg["foo"] = [b"bar"]
2327         self.assertEqual([b"bar"], list(self.msg["foo"]))
2328
2329     def test_set_value_text(self):
2330         self.msg["foo"] = ["fool"]
2331         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2332         self.msg["foo"] = ["bar"]
2333         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2334
2335     def test_keys(self):
2336         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2337         self.msg["foo"] = [b"bla"]
2338         self.msg["bar"] = [b"bla"]
2339         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2340
2341     def test_keys_text(self):
2342         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2343         self.msg["foo"] = ["bla"]
2344         self.msg["bar"] = ["bla"]
2345         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2346
2347     def test_dn(self):
2348         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2349         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2350
2351     def test_get_dn(self):
2352         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2353         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2354
2355     def test_dn_text(self):
2356         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2357         self.assertEqual("@BASEINFO", str(self.msg.dn))
2358         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2359
2360     def test_get_dn_text(self):
2361         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2362         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2363         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2364
2365     def test_get_invalid(self):
2366         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2367         self.assertRaises(TypeError, self.msg.get, 42)
2368
2369     def test_get_other(self):
2370         self.msg["foo"] = [b"bar"]
2371         self.assertEqual(b"bar", self.msg.get("foo")[0])
2372         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2373         self.assertEqual(None, self.msg.get("foo", idx=1))
2374         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2375
2376     def test_get_other_text(self):
2377         self.msg["foo"] = ["bar"]
2378         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2379         self.assertEqual("bar", self.msg.text.get("foo")[0])
2380         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2381         self.assertEqual(None, self.msg.get("foo", idx=1))
2382         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2383
2384     def test_get_default(self):
2385         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2386         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2387
2388     def test_get_default_text(self):
2389         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2390         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2391
2392     def test_get_unknown(self):
2393         self.assertEqual(None, self.msg.get("lalalala"))
2394
2395     def test_get_unknown_text(self):
2396         self.assertEqual(None, self.msg.text.get("lalalala"))
2397
2398     def test_msg_diff(self):
2399         l = ldb.Ldb()
2400         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2401         msg1 = next(msgs)[1]
2402         msg2 = next(msgs)[1]
2403         msgdiff = l.msg_diff(msg1, msg2)
2404         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2405         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2406         self.assertEqual(1, len(msgdiff))
2407
2408     def test_equal_empty(self):
2409         msg1 = ldb.Message()
2410         msg2 = ldb.Message()
2411         self.assertEqual(msg1, msg2)
2412
2413     def test_equal_simplel(self):
2414         db = ldb.Ldb()
2415         msg1 = ldb.Message()
2416         msg1.dn = ldb.Dn(db, "foo=bar")
2417         msg2 = ldb.Message()
2418         msg2.dn = ldb.Dn(db, "foo=bar")
2419         self.assertEqual(msg1, msg2)
2420         msg1['foo'] = b'bar'
2421         msg2['foo'] = b'bar'
2422         self.assertEqual(msg1, msg2)
2423         msg2['foo'] = b'blie'
2424         self.assertNotEqual(msg1, msg2)
2425         msg2['foo'] = b'blie'
2426
2427     def test_from_dict(self):
2428         rec = {"dn": "dc=fromdict",
2429                "a1": [b"a1-val1", b"a1-val1"]}
2430         l = ldb.Ldb()
2431         # check different types of input Flags
2432         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2433             m = ldb.Message.from_dict(l, rec, flags)
2434             self.assertEqual(rec["a1"], list(m["a1"]))
2435             self.assertEqual(flags, m["a1"].flags())
2436         # check input params
2437         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2438         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2439         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2440         # Message.from_dict expects dictionary with 'dn'
2441         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2442         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2443
2444     def test_from_dict_text(self):
2445         rec = {"dn": "dc=fromdict",
2446                "a1": ["a1-val1", "a1-val1"]}
2447         l = ldb.Ldb()
2448         # check different types of input Flags
2449         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2450             m = ldb.Message.from_dict(l, rec, flags)
2451             self.assertEqual(rec["a1"], list(m.text["a1"]))
2452             self.assertEqual(flags, m.text["a1"].flags())
2453         # check input params
2454         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2455         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2456         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2457         # Message.from_dict expects dictionary with 'dn'
2458         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2459         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2460
2461     def test_copy_add_message_element(self):
2462         m = ldb.Message()
2463         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2464         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2465         mto = ldb.Message()
2466         mto["1"] = m["1"]
2467         mto["2"] = m["2"]
2468         self.assertEqual(mto["1"], m["1"])
2469         self.assertEqual(mto["2"], m["2"])
2470         mto = ldb.Message()
2471         mto.add(m["1"])
2472         mto.add(m["2"])
2473         self.assertEqual(mto["1"], m["1"])
2474         self.assertEqual(mto["2"], m["2"])
2475
2476     def test_copy_add_message_element_text(self):
2477         m = ldb.Message()
2478         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2479         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2480         mto = ldb.Message()
2481         mto["1"] = m["1"]
2482         mto["2"] = m["2"]
2483         self.assertEqual(mto["1"], m.text["1"])
2484         self.assertEqual(mto["2"], m.text["2"])
2485         mto = ldb.Message()
2486         mto.add(m["1"])
2487         mto.add(m["2"])
2488         self.assertEqual(mto.text["1"], m.text["1"])
2489         self.assertEqual(mto.text["2"], m.text["2"])
2490         self.assertEqual(mto["1"], m["1"])
2491         self.assertEqual(mto["2"], m["2"])
2492
2493
2494 class MessageElementTests(TestCase):
2495
2496     def test_cmp_element(self):
2497         x = ldb.MessageElement([b"foo"])
2498         y = ldb.MessageElement([b"foo"])
2499         z = ldb.MessageElement([b"bzr"])
2500         self.assertEqual(x, y)
2501         self.assertNotEqual(x, z)
2502
2503     def test_cmp_element_text(self):
2504         x = ldb.MessageElement([b"foo"])
2505         y = ldb.MessageElement(["foo"])
2506         self.assertEqual(x, y)
2507
2508     def test_create_iterable(self):
2509         x = ldb.MessageElement([b"foo"])
2510         self.assertEqual([b"foo"], list(x))
2511         self.assertEqual(["foo"], list(x.text))
2512
2513     def test_repr(self):
2514         x = ldb.MessageElement([b"foo"])
2515         if PY3:
2516             self.assertEqual("MessageElement([b'foo'])", repr(x))
2517             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2518         else:
2519             self.assertEqual("MessageElement(['foo'])", repr(x))
2520             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2521         x = ldb.MessageElement([b"foo", b"bla"])
2522         self.assertEqual(2, len(x))
2523         if PY3:
2524             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2525             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2526         else:
2527             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2528             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2529
2530     def test_get_item(self):
2531         x = ldb.MessageElement([b"foo", b"bar"])
2532         self.assertEqual(b"foo", x[0])
2533         self.assertEqual(b"bar", x[1])
2534         self.assertEqual(b"bar", x[-1])
2535         self.assertRaises(IndexError, lambda: x[45])
2536
2537     def test_get_item_text(self):
2538         x = ldb.MessageElement(["foo", "bar"])
2539         self.assertEqual("foo", x.text[0])
2540         self.assertEqual("bar", x.text[1])
2541         self.assertEqual("bar", x.text[-1])
2542         self.assertRaises(IndexError, lambda: x[45])
2543
2544     def test_len(self):
2545         x = ldb.MessageElement([b"foo", b"bar"])
2546         self.assertEqual(2, len(x))
2547
2548     def test_eq(self):
2549         x = ldb.MessageElement([b"foo", b"bar"])
2550         y = ldb.MessageElement([b"foo", b"bar"])
2551         self.assertEqual(y, x)
2552         x = ldb.MessageElement([b"foo"])
2553         self.assertNotEqual(y, x)
2554         y = ldb.MessageElement([b"foo"])
2555         self.assertEqual(y, x)
2556
2557     def test_extended(self):
2558         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2559         if PY3:
2560             self.assertEqual("MessageElement([b'456'])", repr(el))
2561             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2562         else:
2563             self.assertEqual("MessageElement(['456'])", repr(el))
2564             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2565
2566     def test_bad_text(self):
2567         el = ldb.MessageElement(b'\xba\xdd')
2568         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2569
2570
2571 class ModuleTests(TestCase):
2572
2573     def setUp(self):
2574         super(ModuleTests, self).setUp()
2575         self.testdir = tempdir()
2576         self.filename = os.path.join(self.testdir, "test.ldb")
2577         self.ldb = ldb.Ldb(self.filename)
2578
2579     def tearDown(self):
2580         shutil.rmtree(self.testdir)
2581         super(ModuleTests, self).setUp()
2582
2583     def test_register_module(self):
2584         class ExampleModule:
2585             name = "example"
2586         ldb.register_module(ExampleModule)
2587
2588     def test_use_module(self):
2589         ops = []
2590         class ExampleModule:
2591             name = "bla"
2592
2593             def __init__(self, ldb, next):
2594                 ops.append("init")
2595                 self.next = next
2596
2597             def search(self, *args, **kwargs):
2598                 return self.next.search(*args, **kwargs)
2599
2600             def request(self, *args, **kwargs):
2601                 pass
2602
2603         ldb.register_module(ExampleModule)
2604         l = ldb.Ldb(self.filename)
2605         l.add({"dn": "@MODULES", "@LIST": "bla"})
2606         self.assertEqual([], ops)
2607         l = ldb.Ldb(self.filename)
2608         self.assertEqual(["init"], ops)
2609
2610 class LdbResultTests(LdbBaseTest):
2611
2612     def setUp(self):
2613         super(LdbResultTests, self).setUp()
2614         self.testdir = tempdir()
2615         self.filename = os.path.join(self.testdir, "test.ldb")
2616         self.l = ldb.Ldb(self.url(), flags=self.flags())
2617         try:
2618             self.l.add(self.index)
2619         except AttributeError:
2620             pass
2621         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2622                     "objectUUID": b"0123456789abcde0"})
2623         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2624                     "objectUUID": b"0123456789abcde1"})
2625         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2626                     "objectUUID": b"0123456789abcde2"})
2627         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2628                     "objectUUID": b"0123456789abcde3"})
2629         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2630                     "objectUUID": b"0123456789abcde4"})
2631         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2632                     "objectUUID": b"0123456789abcde5"})
2633         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2634                     "objectUUID": b"0123456789abcde6"})
2635         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2636                     "objectUUID": b"0123456789abcde7"})
2637         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2638                     "objectUUID": b"0123456789abcde8"})
2639         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2640                     "objectUUID": b"0123456789abcde9"})
2641         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2642                     "objectUUID": b"0123456789abcdea"})
2643         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2644                     "objectUUID": b"0123456789abcdeb"})
2645         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2646                     "objectUUID": b"0123456789abcdec"})
2647
2648     def tearDown(self):
2649         shutil.rmtree(self.testdir)
2650         super(LdbResultTests, self).tearDown()
2651         # Ensure the LDB is closed now, so we close the FD
2652         del(self.l)
2653
2654     def test_return_type(self):
2655         res = self.l.search()
2656         self.assertEqual(str(res), "<ldb result>")
2657
2658     def test_get_msgs(self):
2659         res = self.l.search()
2660         list = res.msgs
2661
2662     def test_get_controls(self):
2663         res = self.l.search()
2664         list = res.controls
2665
2666     def test_get_referals(self):
2667         res = self.l.search()
2668         list = res.referals
2669
2670     def test_iter_msgs(self):
2671         found = False
2672         for l in self.l.search().msgs:
2673             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2674                 found = True
2675         self.assertTrue(found)
2676
2677     def test_iter_msgs_count(self):
2678         self.assertTrue(self.l.search().count > 0)
2679         # 13 objects has been added to the DC=SAMBA, DC=ORG
2680         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2681
2682     def test_iter_controls(self):
2683         res = self.l.search().controls
2684         it = iter(res)
2685
2686     def test_create_control(self):
2687         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2688         c = ldb.Control(self.l, "relax:1")
2689         self.assertEqual(c.critical, True)
2690         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2691
2692     def test_iter_refs(self):
2693         res = self.l.search().referals
2694         it = iter(res)
2695
2696     def test_search_sequence_msgs(self):
2697         found = False
2698         res = self.l.search().msgs
2699
2700         for i in range(0, len(res)):
2701             l = res[i]
2702             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2703                 found = True
2704         self.assertTrue(found)
2705
2706     def test_search_as_iter(self):
2707         found = False
2708         res = self.l.search()
2709
2710         for l in res:
2711             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2712                 found = True
2713         self.assertTrue(found)
2714
2715     def test_search_iter(self):
2716         found = False
2717         res = self.l.search_iterator()
2718
2719         for l in res:
2720             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2721                 found = True
2722         self.assertTrue(found)
2723
2724
2725     # Show that search results can't see into a transaction
2726     def test_search_against_trans(self):
2727         found11 = False
2728
2729         (r1, w1) = os.pipe()
2730
2731         (r2, w2) = os.pipe()
2732
2733         # For the first element, fork a child that will
2734         # write to the DB
2735         pid = os.fork()
2736         if pid == 0:
2737             # In the child, re-open
2738             del(self.l)
2739             gc.collect()
2740
2741             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2742             # start a transaction
2743             child_ldb.transaction_start()
2744
2745             # write to it
2746             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2747                            "name": b"samba.org",
2748                            "objectUUID": b"o123456789acbdef"})
2749
2750             os.write(w1, b"added")
2751
2752             # Now wait for the search to be done
2753             os.read(r2, 6)
2754
2755             # and commit
2756             try:
2757                 child_ldb.transaction_commit()
2758             except LdbError as err:
2759                 # We print this here to see what went wrong in the child
2760                 print(err)
2761                 os._exit(1)
2762
2763             os.write(w1, b"transaction")
2764             os._exit(0)
2765
2766         self.assertEqual(os.read(r1, 5), b"added")
2767
2768         # This should not turn up until the transaction is concluded
2769         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2770                             scope=ldb.SCOPE_BASE)
2771         self.assertEqual(len(res11), 0)
2772
2773         os.write(w2, b"search")
2774
2775         # Now wait for the transaction to be done.  This should
2776         # deadlock, but the search doesn't hold a read lock for the
2777         # iterator lifetime currently.
2778         self.assertEqual(os.read(r1, 11), b"transaction")
2779
2780         # This should now turn up, as the transaction is over
2781         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2782                             scope=ldb.SCOPE_BASE)
2783         self.assertEqual(len(res11), 1)
2784
2785         self.assertFalse(found11)
2786
2787         (got_pid, status) = os.waitpid(pid, 0)
2788         self.assertEqual(got_pid, pid)
2789
2790
2791     def test_search_iter_against_trans(self):
2792         found = False
2793         found11 = False
2794
2795         # We need to hold this iterator open to hold the all-record
2796         # lock
2797         res = self.l.search_iterator()
2798
2799         (r1, w1) = os.pipe()
2800
2801         (r2, w2) = os.pipe()
2802
2803         # For the first element, with the sequence open (which
2804         # means with ldb locks held), fork a child that will
2805         # write to the DB
2806         pid = os.fork()
2807         if pid == 0:
2808             # In the child, re-open
2809             del(res)
2810             del(self.l)
2811             gc.collect()
2812
2813             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2814             # start a transaction
2815             child_ldb.transaction_start()
2816
2817             # write to it
2818             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2819                            "name": b"samba.org",
2820                            "objectUUID": b"o123456789acbdef"})
2821
2822             os.write(w1, b"added")
2823
2824             # Now wait for the search to be done
2825             os.read(r2, 6)
2826
2827             # and commit
2828             try:
2829                 child_ldb.transaction_commit()
2830             except LdbError as err:
2831                 # We print this here to see what went wrong in the child
2832                 print(err)
2833                 os._exit(1)
2834
2835             os.write(w1, b"transaction")
2836             os._exit(0)
2837
2838         self.assertEqual(os.read(r1, 5), b"added")
2839
2840         # This should not turn up until the transaction is concluded
2841         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2842                             scope=ldb.SCOPE_BASE)
2843         self.assertEqual(len(res11), 0)
2844
2845         os.write(w2, b"search")
2846
2847         # allow the transaction to start
2848         time.sleep(1)
2849
2850         # This should not turn up until the search finishes and
2851         # removed the read lock, but for ldb_tdb that happened as soon
2852         # as we called the first res.next()
2853         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2854                             scope=ldb.SCOPE_BASE)
2855         self.assertEqual(len(res11), 0)
2856
2857         # These results are all collected at the first next(res) call
2858         for l in res:
2859             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2860                 found = True
2861             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2862                 found11 = True
2863
2864         # Now wait for the transaction to be done.
2865         self.assertEqual(os.read(r1, 11), b"transaction")
2866
2867         # This should now turn up, as the transaction is over and all
2868         # read locks are gone
2869         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2870                             scope=ldb.SCOPE_BASE)
2871         self.assertEqual(len(res11), 1)
2872
2873         self.assertTrue(found)
2874         self.assertFalse(found11)
2875
2876         (got_pid, status) = os.waitpid(pid, 0)
2877         self.assertEqual(got_pid, pid)
2878
2879
2880 class LdbResultTestsLmdb(LdbResultTests):
2881
2882     def setUp(self):
2883         self.prefix = MDB_PREFIX
2884         self.index = MDB_INDEX_OBJ
2885         super(LdbResultTestsLmdb, self).setUp()
2886
2887     def tearDown(self):
2888         super(LdbResultTestsLmdb, self).tearDown()
2889
2890
2891 class BadTypeTests(TestCase):
2892     def test_control(self):
2893         l = ldb.Ldb()
2894         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2895         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2896
2897     def test_modify(self):
2898         l = ldb.Ldb()
2899         dn = ldb.Dn(l, 'a=b')
2900         m = ldb.Message(dn)
2901         self.assertRaises(TypeError, l.modify, '<bad type>')
2902         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2903
2904     def test_add(self):
2905         l = ldb.Ldb()
2906         dn = ldb.Dn(l, 'a=b')
2907         m = ldb.Message(dn)
2908         self.assertRaises(TypeError, l.add, '<bad type>')
2909         self.assertRaises(TypeError, l.add, m, '<bad type>')
2910
2911     def test_delete(self):
2912         l = ldb.Ldb()
2913         dn = ldb.Dn(l, 'a=b')
2914         self.assertRaises(TypeError, l.add, '<bad type>')
2915         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2916
2917     def test_rename(self):
2918         l = ldb.Ldb()
2919         dn = ldb.Dn(l, 'a=b')
2920         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2921         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2922         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2923
2924     def test_search(self):
2925         l = ldb.Ldb()
2926         self.assertRaises(TypeError, l.search, base=1234)
2927         self.assertRaises(TypeError, l.search, scope='<bad type>')
2928         self.assertRaises(TypeError, l.search, expression=1234)
2929         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2930         self.assertRaises(TypeError, l.search, controls='<bad type>')
2931
2932
2933 class VersionTests(TestCase):
2934
2935     def test_version(self):
2936         self.assertTrue(isinstance(ldb.__version__, str))
2937
2938
2939 if __name__ == '__main__':
2940     import unittest
2941     unittest.TestProgram()