lib/ldb/tests: add test for ldb.Dn passed utf8 unicode
[amitay/samba.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18 MDB_INDEX_OBJ = {
19     "dn": "@INDEXLIST",
20     "@IDXONE": [b"1"],
21     "@IDXGUID": [b"objectUUID"],
22     "@IDX_DN_GUID": [b"GUID"]
23 }
24
25
26 def tempdir():
27     import tempfile
28     try:
29         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
30     except KeyError:
31         dir_prefix = None
32     return tempfile.mkdtemp(dir=dir_prefix)
33
34
35 class NoContextTests(TestCase):
36
37     def test_valid_attr_name(self):
38         self.assertTrue(ldb.valid_attr_name("foo"))
39         self.assertFalse(ldb.valid_attr_name("24foo"))
40
41     def test_timestring(self):
42         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
43         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
44
45     def test_string_to_time(self):
46         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
47         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
48
49     def test_binary_encode(self):
50         encoded = ldb.binary_encode(b'test\\x')
51         decoded = ldb.binary_decode(encoded)
52         self.assertEqual(decoded, b'test\\x')
53
54         encoded2 = ldb.binary_encode('test\\x')
55         self.assertEqual(encoded2, encoded)
56
57
58 class LdbBaseTest(TestCase):
59     def setUp(self):
60         super(LdbBaseTest, self).setUp()
61         try:
62             if self.prefix is None:
63                 self.prefix = TDB_PREFIX
64         except AttributeError:
65             self.prefix = TDB_PREFIX
66
67     def tearDown(self):
68         super(LdbBaseTest, self).tearDown()
69
70     def url(self):
71         return self.prefix + self.filename
72
73     def flags(self):
74         if self.prefix == MDB_PREFIX:
75             return ldb.FLG_NOSYNC
76         else:
77             return 0
78
79
80 class SimpleLdb(LdbBaseTest):
81
82     def setUp(self):
83         super(SimpleLdb, self).setUp()
84         self.testdir = tempdir()
85         self.filename = os.path.join(self.testdir, "test.ldb")
86         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
87         try:
88             self.ldb.add(self.index)
89         except AttributeError:
90             pass
91
92     def tearDown(self):
93         shutil.rmtree(self.testdir)
94         super(SimpleLdb, self).tearDown()
95         # Ensure the LDB is closed now, so we close the FD
96         del(self.ldb)
97
98     def test_connect(self):
99         ldb.Ldb(self.url(), flags=self.flags())
100
101     def test_connect_none(self):
102         ldb.Ldb()
103
104     def test_connect_later(self):
105         x = ldb.Ldb()
106         x.connect(self.url(), flags=self.flags())
107
108     def test_repr(self):
109         x = ldb.Ldb()
110         self.assertTrue(repr(x).startswith("<ldb connection"))
111
112     def test_set_create_perms(self):
113         x = ldb.Ldb()
114         x.set_create_perms(0o600)
115
116     def test_modules_none(self):
117         x = ldb.Ldb()
118         self.assertEqual([], x.modules())
119
120     def test_modules_tdb(self):
121         x = ldb.Ldb(self.url(), flags=self.flags())
122         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
123
124     def test_firstmodule_none(self):
125         x = ldb.Ldb()
126         self.assertEqual(x.firstmodule, None)
127
128     def test_firstmodule_tdb(self):
129         x = ldb.Ldb(self.url(), flags=self.flags())
130         mod = x.firstmodule
131         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
132
133     def test_search(self):
134         l = ldb.Ldb(self.url(), flags=self.flags())
135         self.assertEqual(len(l.search()), 0)
136
137     def test_search_controls(self):
138         l = ldb.Ldb(self.url(), flags=self.flags())
139         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
140
141     def test_utf8_ldb_Dn(self):
142         l = ldb.Ldb(self.url(), flags=self.flags())
143         dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
144
145     def test_search_attrs(self):
146         l = ldb.Ldb(self.url(), flags=self.flags())
147         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
148
149     def test_search_string_dn(self):
150         l = ldb.Ldb(self.url(), flags=self.flags())
151         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
152
153     def test_search_attr_string(self):
154         l = ldb.Ldb(self.url(), flags=self.flags())
155         self.assertRaises(TypeError, l.search, attrs="dc")
156         self.assertRaises(TypeError, l.search, attrs=b"dc")
157
158     def test_opaque(self):
159         l = ldb.Ldb(self.url(), flags=self.flags())
160         l.set_opaque("my_opaque", l)
161         self.assertTrue(l.get_opaque("my_opaque") is not None)
162         self.assertEqual(None, l.get_opaque("unknown"))
163
164     def test_search_scope_base_empty_db(self):
165         l = ldb.Ldb(self.url(), flags=self.flags())
166         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
167                                       ldb.SCOPE_BASE)), 0)
168
169     def test_search_scope_onelevel_empty_db(self):
170         l = ldb.Ldb(self.url(), flags=self.flags())
171         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
172                                       ldb.SCOPE_ONELEVEL)), 0)
173
174     def test_delete(self):
175         l = ldb.Ldb(self.url(), flags=self.flags())
176         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
177
178     def test_delete_w_unhandled_ctrl(self):
179         l = ldb.Ldb(self.url(), flags=self.flags())
180         m = ldb.Message()
181         m.dn = ldb.Dn(l, "dc=foo1")
182         m["b"] = [b"a"]
183         m["objectUUID"] = b"0123456789abcdef"
184         l.add(m)
185         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
186         l.delete(m.dn)
187
188     def test_contains(self):
189         name = self.url()
190         l = ldb.Ldb(name, flags=self.flags())
191         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
192         l = ldb.Ldb(name, flags=self.flags())
193         m = ldb.Message()
194         m.dn = ldb.Dn(l, "dc=foo3")
195         m["b"] = ["a"]
196         m["objectUUID"] = b"0123456789abcdef"
197         l.add(m)
198         try:
199             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
200             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
201         finally:
202             l.delete(m.dn)
203
204     def test_get_config_basedn(self):
205         l = ldb.Ldb(self.url(), flags=self.flags())
206         self.assertEqual(None, l.get_config_basedn())
207
208     def test_get_root_basedn(self):
209         l = ldb.Ldb(self.url(), flags=self.flags())
210         self.assertEqual(None, l.get_root_basedn())
211
212     def test_get_schema_basedn(self):
213         l = ldb.Ldb(self.url(), flags=self.flags())
214         self.assertEqual(None, l.get_schema_basedn())
215
216     def test_get_default_basedn(self):
217         l = ldb.Ldb(self.url(), flags=self.flags())
218         self.assertEqual(None, l.get_default_basedn())
219
220     def test_add(self):
221         l = ldb.Ldb(self.url(), flags=self.flags())
222         m = ldb.Message()
223         m.dn = ldb.Dn(l, "dc=foo4")
224         m["bla"] = b"bla"
225         m["objectUUID"] = b"0123456789abcdef"
226         self.assertEqual(len(l.search()), 0)
227         l.add(m)
228         try:
229             self.assertEqual(len(l.search()), 1)
230         finally:
231             l.delete(ldb.Dn(l, "dc=foo4"))
232
233     def test_search_iterator(self):
234         l = ldb.Ldb(self.url(), flags=self.flags())
235         s = l.search_iterator()
236         s.abandon()
237         try:
238             for me in s:
239                 self.fail()
240             self.fail()
241         except RuntimeError as re:
242             pass
243         try:
244             s.abandon()
245             self.fail()
246         except RuntimeError as re:
247             pass
248         try:
249             s.result()
250             self.fail()
251         except RuntimeError as re:
252             pass
253
254         s = l.search_iterator()
255         count = 0
256         for me in s:
257             self.assertTrue(isinstance(me, ldb.Message))
258             count += 1
259         r = s.result()
260         self.assertEqual(len(r), 0)
261         self.assertEqual(count, 0)
262
263         m1 = ldb.Message()
264         m1.dn = ldb.Dn(l, "dc=foo4")
265         m1["bla"] = b"bla"
266         m1["objectUUID"] = b"0123456789abcdef"
267         l.add(m1)
268         try:
269             s = l.search_iterator()
270             msgs = []
271             for me in s:
272                 self.assertTrue(isinstance(me, ldb.Message))
273                 count += 1
274                 msgs.append(me)
275             r = s.result()
276             self.assertEqual(len(r), 0)
277             self.assertEqual(len(msgs), 1)
278             self.assertEqual(msgs[0].dn, m1.dn)
279
280             m2 = ldb.Message()
281             m2.dn = ldb.Dn(l, "dc=foo5")
282             m2["bla"] = b"bla"
283             m2["objectUUID"] = b"0123456789abcdee"
284             l.add(m2)
285
286             s = l.search_iterator()
287             msgs = []
288             for me in s:
289                 self.assertTrue(isinstance(me, ldb.Message))
290                 count += 1
291                 msgs.append(me)
292             r = s.result()
293             self.assertEqual(len(r), 0)
294             self.assertEqual(len(msgs), 2)
295             if msgs[0].dn == m1.dn:
296                 self.assertEqual(msgs[0].dn, m1.dn)
297                 self.assertEqual(msgs[1].dn, m2.dn)
298             else:
299                 self.assertEqual(msgs[0].dn, m2.dn)
300                 self.assertEqual(msgs[1].dn, m1.dn)
301
302             s = l.search_iterator()
303             msgs = []
304             for me in s:
305                 self.assertTrue(isinstance(me, ldb.Message))
306                 count += 1
307                 msgs.append(me)
308                 break
309             try:
310                 s.result()
311                 self.fail()
312             except RuntimeError as re:
313                 pass
314             for me in s:
315                 self.assertTrue(isinstance(me, ldb.Message))
316                 count += 1
317                 msgs.append(me)
318                 break
319             for me in s:
320                 self.fail()
321
322             r = s.result()
323             self.assertEqual(len(r), 0)
324             self.assertEqual(len(msgs), 2)
325             if msgs[0].dn == m1.dn:
326                 self.assertEqual(msgs[0].dn, m1.dn)
327                 self.assertEqual(msgs[1].dn, m2.dn)
328             else:
329                 self.assertEqual(msgs[0].dn, m2.dn)
330                 self.assertEqual(msgs[1].dn, m1.dn)
331         finally:
332             l.delete(ldb.Dn(l, "dc=foo4"))
333             l.delete(ldb.Dn(l, "dc=foo5"))
334
335     def test_add_text(self):
336         l = ldb.Ldb(self.url(), flags=self.flags())
337         m = ldb.Message()
338         m.dn = ldb.Dn(l, "dc=foo4")
339         m["bla"] = "bla"
340         m["objectUUID"] = b"0123456789abcdef"
341         self.assertEqual(len(l.search()), 0)
342         l.add(m)
343         try:
344             self.assertEqual(len(l.search()), 1)
345         finally:
346             l.delete(ldb.Dn(l, "dc=foo4"))
347
348     def test_add_w_unhandled_ctrl(self):
349         l = ldb.Ldb(self.url(), flags=self.flags())
350         m = ldb.Message()
351         m.dn = ldb.Dn(l, "dc=foo4")
352         m["bla"] = b"bla"
353         self.assertEqual(len(l.search()), 0)
354         self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
355
356     def test_add_dict(self):
357         l = ldb.Ldb(self.url(), flags=self.flags())
358         m = {"dn": ldb.Dn(l, "dc=foo5"),
359              "bla": b"bla",
360              "objectUUID": b"0123456789abcdef"}
361         self.assertEqual(len(l.search()), 0)
362         l.add(m)
363         try:
364             self.assertEqual(len(l.search()), 1)
365         finally:
366             l.delete(ldb.Dn(l, "dc=foo5"))
367
368     def test_add_dict_text(self):
369         l = ldb.Ldb(self.url(), flags=self.flags())
370         m = {"dn": ldb.Dn(l, "dc=foo5"),
371              "bla": "bla",
372              "objectUUID": b"0123456789abcdef"}
373         self.assertEqual(len(l.search()), 0)
374         l.add(m)
375         try:
376             self.assertEqual(len(l.search()), 1)
377         finally:
378             l.delete(ldb.Dn(l, "dc=foo5"))
379
380     def test_add_dict_string_dn(self):
381         l = ldb.Ldb(self.url(), flags=self.flags())
382         m = {"dn": "dc=foo6", "bla": b"bla",
383              "objectUUID": b"0123456789abcdef"}
384         self.assertEqual(len(l.search()), 0)
385         l.add(m)
386         try:
387             self.assertEqual(len(l.search()), 1)
388         finally:
389             l.delete(ldb.Dn(l, "dc=foo6"))
390
391     def test_add_dict_bytes_dn(self):
392         l = ldb.Ldb(self.url(), flags=self.flags())
393         m = {"dn": b"dc=foo6", "bla": b"bla",
394              "objectUUID": b"0123456789abcdef"}
395         self.assertEqual(len(l.search()), 0)
396         l.add(m)
397         try:
398             self.assertEqual(len(l.search()), 1)
399         finally:
400             l.delete(ldb.Dn(l, "dc=foo6"))
401
402     def test_rename(self):
403         l = ldb.Ldb(self.url(), flags=self.flags())
404         m = ldb.Message()
405         m.dn = ldb.Dn(l, "dc=foo7")
406         m["bla"] = b"bla"
407         m["objectUUID"] = b"0123456789abcdef"
408         self.assertEqual(len(l.search()), 0)
409         l.add(m)
410         try:
411             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
412             self.assertEqual(len(l.search()), 1)
413         finally:
414             l.delete(ldb.Dn(l, "dc=bar"))
415
416     def test_rename_string_dns(self):
417         l = ldb.Ldb(self.url(), flags=self.flags())
418         m = ldb.Message()
419         m.dn = ldb.Dn(l, "dc=foo8")
420         m["bla"] = b"bla"
421         m["objectUUID"] = b"0123456789abcdef"
422         self.assertEqual(len(l.search()), 0)
423         l.add(m)
424         self.assertEqual(len(l.search()), 1)
425         try:
426             l.rename("dc=foo8", "dc=bar")
427             self.assertEqual(len(l.search()), 1)
428         finally:
429             l.delete(ldb.Dn(l, "dc=bar"))
430
431     def test_rename_bad_string_dns(self):
432         l = ldb.Ldb(self.url(), flags=self.flags())
433         m = ldb.Message()
434         m.dn = ldb.Dn(l, "dc=foo8")
435         m["bla"] = b"bla"
436         m["objectUUID"] = b"0123456789abcdef"
437         self.assertEqual(len(l.search()), 0)
438         l.add(m)
439         self.assertEqual(len(l.search()), 1)
440         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
441         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
442         l.delete(ldb.Dn(l, "dc=foo8"))
443
444     def test_empty_dn(self):
445         l = ldb.Ldb(self.url(), flags=self.flags())
446         self.assertEqual(0, len(l.search()))
447         m = ldb.Message()
448         m.dn = ldb.Dn(l, "dc=empty")
449         m["objectUUID"] = b"0123456789abcdef"
450         l.add(m)
451         rm = l.search()
452         self.assertEqual(1, len(rm))
453         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
454                          set(rm[0].keys()))
455
456         rm = l.search(m.dn)
457         self.assertEqual(1, len(rm))
458         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
459                          set(rm[0].keys()))
460         rm = l.search(m.dn, attrs=["blah"])
461         self.assertEqual(1, len(rm))
462         self.assertEqual(0, len(rm[0]))
463
464     def test_modify_delete(self):
465         l = ldb.Ldb(self.url(), flags=self.flags())
466         m = ldb.Message()
467         m.dn = ldb.Dn(l, "dc=modifydelete")
468         m["bla"] = [b"1234"]
469         m["objectUUID"] = b"0123456789abcdef"
470         l.add(m)
471         rm = l.search(m.dn)[0]
472         self.assertEqual([b"1234"], list(rm["bla"]))
473         try:
474             m = ldb.Message()
475             m.dn = ldb.Dn(l, "dc=modifydelete")
476             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
477             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
478             l.modify(m)
479             rm = l.search(m.dn)
480             self.assertEqual(1, len(rm))
481             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
482                              set(rm[0].keys()))
483             rm = l.search(m.dn, attrs=["bla"])
484             self.assertEqual(1, len(rm))
485             self.assertEqual(0, len(rm[0]))
486         finally:
487             l.delete(ldb.Dn(l, "dc=modifydelete"))
488
489     def test_modify_delete_text(self):
490         l = ldb.Ldb(self.url(), flags=self.flags())
491         m = ldb.Message()
492         m.dn = ldb.Dn(l, "dc=modifydelete")
493         m.text["bla"] = ["1234"]
494         m["objectUUID"] = b"0123456789abcdef"
495         l.add(m)
496         rm = l.search(m.dn)[0]
497         self.assertEqual(["1234"], list(rm.text["bla"]))
498         try:
499             m = ldb.Message()
500             m.dn = ldb.Dn(l, "dc=modifydelete")
501             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
502             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
503             l.modify(m)
504             rm = l.search(m.dn)
505             self.assertEqual(1, len(rm))
506             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
507                              set(rm[0].keys()))
508             rm = l.search(m.dn, attrs=["bla"])
509             self.assertEqual(1, len(rm))
510             self.assertEqual(0, len(rm[0]))
511         finally:
512             l.delete(ldb.Dn(l, "dc=modifydelete"))
513
514     def test_modify_add(self):
515         l = ldb.Ldb(self.url(), flags=self.flags())
516         m = ldb.Message()
517         m.dn = ldb.Dn(l, "dc=add")
518         m["bla"] = [b"1234"]
519         m["objectUUID"] = b"0123456789abcdef"
520         l.add(m)
521         try:
522             m = ldb.Message()
523             m.dn = ldb.Dn(l, "dc=add")
524             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
525             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
526             l.modify(m)
527             rm = l.search(m.dn)[0]
528             self.assertEqual(3, len(rm))
529             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
530         finally:
531             l.delete(ldb.Dn(l, "dc=add"))
532
533     def test_modify_add_text(self):
534         l = ldb.Ldb(self.url(), flags=self.flags())
535         m = ldb.Message()
536         m.dn = ldb.Dn(l, "dc=add")
537         m.text["bla"] = ["1234"]
538         m["objectUUID"] = b"0123456789abcdef"
539         l.add(m)
540         try:
541             m = ldb.Message()
542             m.dn = ldb.Dn(l, "dc=add")
543             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
544             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
545             l.modify(m)
546             rm = l.search(m.dn)[0]
547             self.assertEqual(3, len(rm))
548             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
549         finally:
550             l.delete(ldb.Dn(l, "dc=add"))
551
552     def test_modify_replace(self):
553         l = ldb.Ldb(self.url(), flags=self.flags())
554         m = ldb.Message()
555         m.dn = ldb.Dn(l, "dc=modify2")
556         m["bla"] = [b"1234", b"456"]
557         m["objectUUID"] = b"0123456789abcdef"
558         l.add(m)
559         try:
560             m = ldb.Message()
561             m.dn = ldb.Dn(l, "dc=modify2")
562             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
563             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
564             l.modify(m)
565             rm = l.search(m.dn)[0]
566             self.assertEqual(3, len(rm))
567             self.assertEqual([b"789"], list(rm["bla"]))
568             rm = l.search(m.dn, attrs=["bla"])[0]
569             self.assertEqual(1, len(rm))
570         finally:
571             l.delete(ldb.Dn(l, "dc=modify2"))
572
573     def test_modify_replace_text(self):
574         l = ldb.Ldb(self.url(), flags=self.flags())
575         m = ldb.Message()
576         m.dn = ldb.Dn(l, "dc=modify2")
577         m.text["bla"] = ["1234", "456"]
578         m["objectUUID"] = b"0123456789abcdef"
579         l.add(m)
580         try:
581             m = ldb.Message()
582             m.dn = ldb.Dn(l, "dc=modify2")
583             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
584             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
585             l.modify(m)
586             rm = l.search(m.dn)[0]
587             self.assertEqual(3, len(rm))
588             self.assertEqual(["789"], list(rm.text["bla"]))
589             rm = l.search(m.dn, attrs=["bla"])[0]
590             self.assertEqual(1, len(rm))
591         finally:
592             l.delete(ldb.Dn(l, "dc=modify2"))
593
594     def test_modify_flags_change(self):
595         l = ldb.Ldb(self.url(), flags=self.flags())
596         m = ldb.Message()
597         m.dn = ldb.Dn(l, "dc=add")
598         m["bla"] = [b"1234"]
599         m["objectUUID"] = b"0123456789abcdef"
600         l.add(m)
601         try:
602             m = ldb.Message()
603             m.dn = ldb.Dn(l, "dc=add")
604             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
605             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
606             l.modify(m)
607             rm = l.search(m.dn)[0]
608             self.assertEqual(3, len(rm))
609             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
610
611             # Now create another modify, but switch the flags before we do it
612             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
613             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
614             l.modify(m)
615             rm = l.search(m.dn, attrs=["bla"])[0]
616             self.assertEqual(1, len(rm))
617             self.assertEqual([b"1234"], list(rm["bla"]))
618         finally:
619             l.delete(ldb.Dn(l, "dc=add"))
620
621     def test_modify_flags_change_text(self):
622         l = ldb.Ldb(self.url(), flags=self.flags())
623         m = ldb.Message()
624         m.dn = ldb.Dn(l, "dc=add")
625         m.text["bla"] = ["1234"]
626         m["objectUUID"] = b"0123456789abcdef"
627         l.add(m)
628         try:
629             m = ldb.Message()
630             m.dn = ldb.Dn(l, "dc=add")
631             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
632             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
633             l.modify(m)
634             rm = l.search(m.dn)[0]
635             self.assertEqual(3, len(rm))
636             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
637
638             # Now create another modify, but switch the flags before we do it
639             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
640             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
641             l.modify(m)
642             rm = l.search(m.dn, attrs=["bla"])[0]
643             self.assertEqual(1, len(rm))
644             self.assertEqual(["1234"], list(rm.text["bla"]))
645         finally:
646             l.delete(ldb.Dn(l, "dc=add"))
647
648     def test_transaction_commit(self):
649         l = ldb.Ldb(self.url(), flags=self.flags())
650         l.transaction_start()
651         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
652         m["foo"] = [b"bar"]
653         m["objectUUID"] = b"0123456789abcdef"
654         l.add(m)
655         l.transaction_commit()
656         l.delete(m.dn)
657
658     def test_transaction_cancel(self):
659         l = ldb.Ldb(self.url(), flags=self.flags())
660         l.transaction_start()
661         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
662         m["foo"] = [b"bar"]
663         m["objectUUID"] = b"0123456789abcdee"
664         l.add(m)
665         l.transaction_cancel()
666         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
667
668     def test_set_debug(self):
669         def my_report_fn(level, text):
670             pass
671         l = ldb.Ldb(self.url(), flags=self.flags())
672         l.set_debug(my_report_fn)
673
674     def test_zero_byte_string(self):
675         """Testing we do not get trapped in the \0 byte in a property string."""
676         l = ldb.Ldb(self.url(), flags=self.flags())
677         l.add({
678             "dn": b"dc=somedn",
679             "objectclass": b"user",
680             "cN": b"LDAPtestUSER",
681             "givenname": b"ldap",
682             "displayname": b"foo\0bar",
683             "objectUUID": b"0123456789abcdef"
684         })
685         res = l.search(expression="(dn=dc=somedn)")
686         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
687
688     def test_no_crash_broken_expr(self):
689         l = ldb.Ldb(self.url(), flags=self.flags())
690         self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
691
692 # Run the SimpleLdb tests against an lmdb backend
693
694
695 class SimpleLdbLmdb(SimpleLdb):
696
697     def setUp(self):
698         self.prefix = MDB_PREFIX
699         self.index = MDB_INDEX_OBJ
700         super(SimpleLdbLmdb, self).setUp()
701
702     def tearDown(self):
703         super(SimpleLdbLmdb, self).tearDown()
704
705
706 class SearchTests(LdbBaseTest):
707     def tearDown(self):
708         shutil.rmtree(self.testdir)
709         super(SearchTests, self).tearDown()
710
711         # Ensure the LDB is closed now, so we close the FD
712         del(self.l)
713
714     def setUp(self):
715         super(SearchTests, self).setUp()
716         self.testdir = tempdir()
717         self.filename = os.path.join(self.testdir, "search_test.ldb")
718         options = ["modules:rdn_name"]
719         if hasattr(self, 'IDXCHECK'):
720             options.append("disable_full_db_scan_for_self_test:1")
721         self.l = ldb.Ldb(self.url(),
722                          flags=self.flags(),
723                          options=options)
724         try:
725             self.l.add(self.index)
726         except AttributeError:
727             pass
728
729         self.l.add({"dn": "@ATTRIBUTES",
730                     "DC": "CASE_INSENSITIVE"})
731
732         # Note that we can't use the name objectGUID here, as we
733         # want to stay clear of the objectGUID handler in LDB and
734         # instead use just the 16 bytes raw, which we just keep
735         # to printable chars here for ease of handling.
736
737         self.l.add({"dn": "DC=SAMBA,DC=ORG",
738                     "name": b"samba.org",
739                     "objectUUID": b"0123456789abcdef"})
740         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
741                     "name": b"Admins",
742                     "x": "z", "y": "a",
743                     "objectUUID": b"0123456789abcde1"})
744         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
745                     "name": b"Users",
746                     "x": "z", "y": "a",
747                     "objectUUID": b"0123456789abcde2"})
748         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
749                     "name": b"OU #1",
750                     "x": "y", "y": "a",
751                     "objectUUID": b"0123456789abcde3"})
752         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
753                     "name": b"OU #2",
754                     "x": "y", "y": "a",
755                     "objectUUID": b"0123456789abcde4"})
756         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
757                     "name": b"OU #3",
758                     "x": "y", "y": "a",
759                     "objectUUID": b"0123456789abcde5"})
760         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
761                     "name": b"OU #4",
762                     "x": "y", "y": "a",
763                     "objectUUID": b"0123456789abcde6"})
764         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
765                     "name": b"OU #5",
766                     "x": "y", "y": "a",
767                     "objectUUID": b"0123456789abcde7"})
768         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
769                     "name": b"OU #6",
770                     "x": "y", "y": "a",
771                     "objectUUID": b"0123456789abcde8"})
772         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
773                     "name": b"OU #7",
774                     "x": "y", "y": "a",
775                     "objectUUID": b"0123456789abcde9"})
776         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
777                     "name": b"OU #8",
778                     "x": "y", "y": "a",
779                     "objectUUID": b"0123456789abcde0"})
780         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
781                     "name": b"OU #9",
782                     "x": "y", "y": "a",
783                     "objectUUID": b"0123456789abcdea"})
784         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
785                     "name": b"OU #10",
786                     "x": "y", "y": "a",
787                     "objectUUID": b"0123456789abcdeb"})
788         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
789                     "name": b"OU #10",
790                     "x": "y", "y": "a",
791                     "objectUUID": b"0123456789abcdec"})
792         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
793                     "name": b"OU #10",
794                     "x": "y", "y": "b",
795                     "objectUUID": b"0123456789abcded"})
796         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
797                     "name": b"OU #10",
798                     "x": "x", "y": "b",
799                     "objectUUID": b"0123456789abcdee"})
800         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
801                     "name": b"OU #10",
802                     "x": "x", "y": "b",
803                     "objectUUID": b"0123456789abcd01"})
804         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
805                     "name": b"OU #10",
806                     "x": "x", "y": "b",
807                     "objectUUID": b"0123456789abcd02"})
808         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
809                     "name": b"OU #10",
810                     "x": "x", "y": "b",
811                     "objectUUID": b"0123456789abcd03"})
812         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
813                     "name": b"OU #10",
814                     "x": "x", "y": "b",
815                     "objectUUID": b"0123456789abcd04"})
816         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
817                     "name": b"OU #10",
818                     "x": "x", "y": "b",
819                     "objectUUID": b"0123456789abcd05"})
820         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
821                     "name": b"OU #10",
822                     "x": "x", "y": "b",
823                     "objectUUID": b"0123456789abcd06"})
824         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
825                     "name": b"OU #10",
826                     "x": "x", "y": "b",
827                     "objectUUID": b"0123456789abcd07"})
828         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
829                     "name": b"OU #10",
830                     "x": "x", "y": "c",
831                     "objectUUID": b"0123456789abcd08"})
832         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
833                     "name": b"OU #10",
834                     "x": "x", "y": "c",
835                     "objectUUID": b"0123456789abcd09"})
836
837     def test_base(self):
838         """Testing a search"""
839
840         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
841                               scope=ldb.SCOPE_BASE)
842         self.assertEqual(len(res11), 1)
843
844     def test_base_lower(self):
845         """Testing a search"""
846
847         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
848                               scope=ldb.SCOPE_BASE)
849         self.assertEqual(len(res11), 1)
850
851     def test_base_or(self):
852         """Testing a search"""
853
854         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
855                               scope=ldb.SCOPE_BASE,
856                               expression="(|(ou=ou11)(ou=ou12))")
857         self.assertEqual(len(res11), 1)
858
859     def test_base_or2(self):
860         """Testing a search"""
861
862         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
863                               scope=ldb.SCOPE_BASE,
864                               expression="(|(x=y)(y=b))")
865         self.assertEqual(len(res11), 1)
866
867     def test_base_and(self):
868         """Testing a search"""
869
870         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
871                               scope=ldb.SCOPE_BASE,
872                               expression="(&(ou=ou11)(ou=ou12))")
873         self.assertEqual(len(res11), 0)
874
875     def test_base_and2(self):
876         """Testing a search"""
877
878         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
879                               scope=ldb.SCOPE_BASE,
880                               expression="(&(x=y)(y=a))")
881         self.assertEqual(len(res11), 1)
882
883     def test_base_false(self):
884         """Testing a search"""
885
886         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
887                               scope=ldb.SCOPE_BASE,
888                               expression="(|(ou=ou13)(ou=ou12))")
889         self.assertEqual(len(res11), 0)
890
891     def test_check_base_false(self):
892         """Testing a search"""
893         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
894                               scope=ldb.SCOPE_BASE,
895                               expression="(|(ou=ou13)(ou=ou12))")
896         self.assertEqual(len(res11), 0)
897
898     def test_check_base_error(self):
899         """Testing a search"""
900         checkbaseonsearch = {"dn": "@OPTIONS",
901                              "checkBaseOnSearch": b"TRUE"}
902         try:
903             self.l.add(checkbaseonsearch)
904         except ldb.LdbError as err:
905             enum = err.args[0]
906             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
907             m = ldb.Message.from_dict(self.l,
908                                       checkbaseonsearch)
909             self.l.modify(m)
910
911         try:
912             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
913                                   scope=ldb.SCOPE_BASE,
914                                   expression="(|(ou=ou13)(ou=ou12))")
915             self.fail("Should have failed on missing base")
916         except ldb.LdbError as err:
917             enum = err.args[0]
918             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
919
920     def test_subtree_and(self):
921         """Testing a search"""
922
923         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
924                               scope=ldb.SCOPE_SUBTREE,
925                               expression="(&(ou=ou11)(ou=ou12))")
926         self.assertEqual(len(res11), 0)
927
928     def test_subtree_and2(self):
929         """Testing a search"""
930
931         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
932                               scope=ldb.SCOPE_SUBTREE,
933                               expression="(&(x=y)(|(y=b)(y=c)))")
934         self.assertEqual(len(res11), 1)
935
936     def test_subtree_and2_lower(self):
937         """Testing a search"""
938
939         res11 = self.l.search(base="DC=samba,DC=org",
940                               scope=ldb.SCOPE_SUBTREE,
941                               expression="(&(x=y)(|(y=b)(y=c)))")
942         self.assertEqual(len(res11), 1)
943
944     def test_subtree_or(self):
945         """Testing a search"""
946
947         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
948                               scope=ldb.SCOPE_SUBTREE,
949                               expression="(|(ou=ou11)(ou=ou12))")
950         self.assertEqual(len(res11), 2)
951
952     def test_subtree_or2(self):
953         """Testing a search"""
954
955         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
956                               scope=ldb.SCOPE_SUBTREE,
957                               expression="(|(x=y)(y=b))")
958         self.assertEqual(len(res11), 20)
959
960     def test_subtree_or3(self):
961         """Testing a search"""
962
963         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
964                               scope=ldb.SCOPE_SUBTREE,
965                               expression="(|(x=y)(y=b)(y=c))")
966         self.assertEqual(len(res11), 22)
967
968     def test_one_and(self):
969         """Testing a search"""
970
971         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
972                               scope=ldb.SCOPE_ONELEVEL,
973                               expression="(&(ou=ou11)(ou=ou12))")
974         self.assertEqual(len(res11), 0)
975
976     def test_one_and2(self):
977         """Testing a search"""
978
979         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
980                               scope=ldb.SCOPE_ONELEVEL,
981                               expression="(&(x=y)(y=b))")
982         self.assertEqual(len(res11), 1)
983
984     def test_one_or(self):
985         """Testing a search"""
986
987         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
988                               scope=ldb.SCOPE_ONELEVEL,
989                               expression="(|(ou=ou11)(ou=ou12))")
990         self.assertEqual(len(res11), 2)
991
992     def test_one_or2(self):
993         """Testing a search"""
994
995         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
996                               scope=ldb.SCOPE_ONELEVEL,
997                               expression="(|(x=y)(y=b))")
998         self.assertEqual(len(res11), 20)
999
1000     def test_one_or2_lower(self):
1001         """Testing a search"""
1002
1003         res11 = self.l.search(base="DC=samba,DC=org",
1004                               scope=ldb.SCOPE_ONELEVEL,
1005                               expression="(|(x=y)(y=b))")
1006         self.assertEqual(len(res11), 20)
1007
1008     def test_one_unindexable(self):
1009         """Testing a search"""
1010
1011         try:
1012             res11 = self.l.search(base="DC=samba,DC=org",
1013                                   scope=ldb.SCOPE_ONELEVEL,
1014                                   expression="(y=b*)")
1015             if hasattr(self, 'IDX') and \
1016                not hasattr(self, 'IDXONE') and \
1017                hasattr(self, 'IDXCHECK'):
1018                 self.fail("Should have failed as un-indexed search")
1019
1020             self.assertEqual(len(res11), 9)
1021
1022         except ldb.LdbError as err:
1023             enum = err.args[0]
1024             estr = err.args[1]
1025             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1026             self.assertIn(estr, "ldb FULL SEARCH disabled")
1027
1028     def test_one_unindexable_presence(self):
1029         """Testing a search"""
1030
1031         try:
1032             res11 = self.l.search(base="DC=samba,DC=org",
1033                                   scope=ldb.SCOPE_ONELEVEL,
1034                                   expression="(y=*)")
1035             if hasattr(self, 'IDX') and \
1036                not hasattr(self, 'IDXONE') and \
1037                hasattr(self, 'IDXCHECK'):
1038                 self.fail("Should have failed as un-indexed search")
1039
1040             self.assertEqual(len(res11), 24)
1041
1042         except ldb.LdbError as err:
1043             enum = err.args[0]
1044             estr = err.args[1]
1045             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1046             self.assertIn(estr, "ldb FULL SEARCH disabled")
1047
1048     def test_subtree_and_or(self):
1049         """Testing a search"""
1050
1051         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1052                               scope=ldb.SCOPE_SUBTREE,
1053                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1054         self.assertEqual(len(res11), 0)
1055
1056     def test_subtree_and_or2(self):
1057         """Testing a search"""
1058
1059         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1060                               scope=ldb.SCOPE_SUBTREE,
1061                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1062         self.assertEqual(len(res11), 0)
1063
1064     def test_subtree_and_or3(self):
1065         """Testing a search"""
1066
1067         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1068                               scope=ldb.SCOPE_SUBTREE,
1069                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1070         self.assertEqual(len(res11), 2)
1071
1072     def test_subtree_and_or4(self):
1073         """Testing a search"""
1074
1075         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1076                               scope=ldb.SCOPE_SUBTREE,
1077                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1078         self.assertEqual(len(res11), 2)
1079
1080     def test_subtree_and_or5(self):
1081         """Testing a search"""
1082
1083         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1084                               scope=ldb.SCOPE_SUBTREE,
1085                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1086         self.assertEqual(len(res11), 1)
1087
1088     def test_subtree_or_and(self):
1089         """Testing a search"""
1090
1091         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1092                               scope=ldb.SCOPE_SUBTREE,
1093                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1094         self.assertEqual(len(res11), 10)
1095
1096     def test_subtree_large_and_unique(self):
1097         """Testing a search"""
1098
1099         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1100                               scope=ldb.SCOPE_SUBTREE,
1101                               expression="(&(ou=ou10)(y=a))")
1102         self.assertEqual(len(res11), 1)
1103
1104     def test_subtree_and_none(self):
1105         """Testing a search"""
1106
1107         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1108                               scope=ldb.SCOPE_SUBTREE,
1109                               expression="(&(ou=ouX)(y=a))")
1110         self.assertEqual(len(res11), 0)
1111
1112     def test_subtree_and_idx_record(self):
1113         """Testing a search against the index record"""
1114
1115         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1116                               scope=ldb.SCOPE_SUBTREE,
1117                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1118         self.assertEqual(len(res11), 0)
1119
1120     def test_subtree_and_idxone_record(self):
1121         """Testing a search against the index record"""
1122
1123         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1124                               scope=ldb.SCOPE_SUBTREE,
1125                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1126         self.assertEqual(len(res11), 0)
1127
1128     def test_subtree_unindexable(self):
1129         """Testing a search"""
1130
1131         try:
1132             res11 = self.l.search(base="DC=samba,DC=org",
1133                                   scope=ldb.SCOPE_SUBTREE,
1134                                   expression="(y=b*)")
1135             if hasattr(self, 'IDX') and \
1136                hasattr(self, 'IDXCHECK'):
1137                 self.fail("Should have failed as un-indexed search")
1138
1139             self.assertEqual(len(res11), 9)
1140
1141         except ldb.LdbError as err:
1142             enum = err.args[0]
1143             estr = err.args[1]
1144             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1145             self.assertIn(estr, "ldb FULL SEARCH disabled")
1146
1147     def test_subtree_unindexable_presence(self):
1148         """Testing a search"""
1149
1150         try:
1151             res11 = self.l.search(base="DC=samba,DC=org",
1152                                   scope=ldb.SCOPE_SUBTREE,
1153                                   expression="(y=*)")
1154             if hasattr(self, 'IDX') and \
1155                hasattr(self, 'IDXCHECK'):
1156                 self.fail("Should have failed as un-indexed search")
1157
1158             self.assertEqual(len(res11), 24)
1159
1160         except ldb.LdbError as err:
1161             enum = err.args[0]
1162             estr = err.args[1]
1163             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1164             self.assertIn(estr, "ldb FULL SEARCH disabled")
1165
1166     def test_dn_filter_one(self):
1167         """Testing that a dn= filter succeeds
1168         (or fails with disallowDNFilter
1169         set and IDXGUID or (IDX and not IDXONE) mode)
1170         when the scope is SCOPE_ONELEVEL.
1171
1172         This should be made more consistent, but for now lock in
1173         the behaviour
1174
1175         """
1176
1177         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1178                               scope=ldb.SCOPE_ONELEVEL,
1179                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1180         if hasattr(self, 'disallowDNFilter') and \
1181            hasattr(self, 'IDX') and \
1182            (hasattr(self, 'IDXGUID') or
1183             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1184             self.assertEqual(len(res11), 0)
1185         else:
1186             self.assertEqual(len(res11), 1)
1187
1188     def test_dn_filter_subtree(self):
1189         """Testing that a dn= filter succeeds
1190         (or fails with disallowDNFilter set)
1191         when the scope is SCOPE_SUBTREE"""
1192
1193         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1194                               scope=ldb.SCOPE_SUBTREE,
1195                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1196         if hasattr(self, 'disallowDNFilter') \
1197            and hasattr(self, 'IDX'):
1198             self.assertEqual(len(res11), 0)
1199         else:
1200             self.assertEqual(len(res11), 1)
1201
1202     def test_dn_filter_base(self):
1203         """Testing that (incorrectly) a dn= filter works
1204         when the scope is SCOPE_BASE"""
1205
1206         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1207                               scope=ldb.SCOPE_BASE,
1208                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1209
1210         # At some point we should fix this, but it isn't trivial
1211         self.assertEqual(len(res11), 1)
1212
1213     def test_distinguishedName_filter_one(self):
1214         """Testing that a distinguishedName= filter succeeds
1215         when the scope is SCOPE_ONELEVEL.
1216
1217         This should be made more consistent, but for now lock in
1218         the behaviour
1219
1220         """
1221
1222         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1223                               scope=ldb.SCOPE_ONELEVEL,
1224                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1225         self.assertEqual(len(res11), 1)
1226
1227     def test_distinguishedName_filter_subtree(self):
1228         """Testing that a distinguishedName= filter succeeds
1229         when the scope is SCOPE_SUBTREE"""
1230
1231         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1232                               scope=ldb.SCOPE_SUBTREE,
1233                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1234         self.assertEqual(len(res11), 1)
1235
1236     def test_distinguishedName_filter_base(self):
1237         """Testing that (incorrectly) a distinguishedName= filter works
1238         when the scope is SCOPE_BASE"""
1239
1240         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1241                               scope=ldb.SCOPE_BASE,
1242                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1243
1244         # At some point we should fix this, but it isn't trivial
1245         self.assertEqual(len(res11), 1)
1246
1247     def test_bad_dn_filter_base(self):
1248         """Testing that a dn= filter on an invalid DN works
1249         when the scope is SCOPE_BASE but
1250         returns zero results"""
1251
1252         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1253                               scope=ldb.SCOPE_BASE,
1254                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1255
1256         # At some point we should fix this, but it isn't trivial
1257         self.assertEqual(len(res11), 0)
1258
1259
1260     def test_bad_dn_filter_one(self):
1261         """Testing that a dn= filter succeeds but returns zero
1262         results when the DN is not valid on a SCOPE_ONELEVEL search
1263
1264         """
1265
1266         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1267                               scope=ldb.SCOPE_ONELEVEL,
1268                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1269         self.assertEqual(len(res11), 0)
1270
1271     def test_bad_dn_filter_subtree(self):
1272         """Testing that a dn= filter succeeds but returns zero
1273         results when the DN is not valid on a SCOPE_SUBTREE search
1274
1275         """
1276
1277         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1278                               scope=ldb.SCOPE_SUBTREE,
1279                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1280         self.assertEqual(len(res11), 0)
1281
1282     def test_bad_distinguishedName_filter_base(self):
1283         """Testing that a distinguishedName= filter on an invalid DN works
1284         when the scope is SCOPE_BASE but
1285         returns zero results"""
1286
1287         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1288                               scope=ldb.SCOPE_BASE,
1289                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1290
1291         # At some point we should fix this, but it isn't trivial
1292         self.assertEqual(len(res11), 0)
1293
1294
1295     def test_bad_distinguishedName_filter_one(self):
1296         """Testing that a distinguishedName= filter succeeds but returns zero
1297         results when the DN is not valid on a SCOPE_ONELEVEL search
1298
1299         """
1300
1301         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1302                               scope=ldb.SCOPE_ONELEVEL,
1303                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1304         self.assertEqual(len(res11), 0)
1305
1306     def test_bad_distinguishedName_filter_subtree(self):
1307         """Testing that a distinguishedName= filter succeeds but returns zero
1308         results when the DN is not valid on a SCOPE_SUBTREE search
1309
1310         """
1311
1312         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1313                               scope=ldb.SCOPE_SUBTREE,
1314                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1315         self.assertEqual(len(res11), 0)
1316
1317     def test_bad_dn_search_base(self):
1318         """Testing with a bad base DN (SCOPE_BASE)"""
1319
1320         try:
1321             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1322                                   scope=ldb.SCOPE_BASE)
1323             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1324         except ldb.LdbError as err:
1325             enum = err.args[0]
1326             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1327
1328
1329     def test_bad_dn_search_one(self):
1330         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1331
1332         try:
1333             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1334                                   scope=ldb.SCOPE_ONELEVEL)
1335             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1336         except ldb.LdbError as err:
1337             enum = err.args[0]
1338             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1339
1340     def test_bad_dn_search_subtree(self):
1341         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1342
1343         try:
1344             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1345                                   scope=ldb.SCOPE_SUBTREE)
1346             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1347         except ldb.LdbError as err:
1348             enum = err.args[0]
1349             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1350
1351
1352
1353 # Run the search tests against an lmdb backend
1354 class SearchTestsLmdb(SearchTests):
1355
1356     def setUp(self):
1357         self.prefix = MDB_PREFIX
1358         self.index = MDB_INDEX_OBJ
1359         super(SearchTestsLmdb, self).setUp()
1360
1361     def tearDown(self):
1362         super(SearchTestsLmdb, self).tearDown()
1363
1364
1365 class IndexedSearchTests(SearchTests):
1366     """Test searches using the index, to ensure the index doesn't
1367        break things"""
1368
1369     def setUp(self):
1370         super(IndexedSearchTests, self).setUp()
1371         self.l.add({"dn": "@INDEXLIST",
1372                     "@IDXATTR": [b"x", b"y", b"ou"]})
1373         self.IDX = True
1374
1375
1376 class IndexedCheckSearchTests(IndexedSearchTests):
1377     """Test searches using the index, to ensure the index doesn't
1378        break things (full scan disabled)"""
1379
1380     def setUp(self):
1381         self.IDXCHECK = True
1382         super(IndexedCheckSearchTests, self).setUp()
1383
1384
1385 class IndexedSearchDnFilterTests(SearchTests):
1386     """Test searches using the index, to ensure the index doesn't
1387        break things"""
1388
1389     def setUp(self):
1390         super(IndexedSearchDnFilterTests, self).setUp()
1391         self.l.add({"dn": "@OPTIONS",
1392                     "disallowDNFilter": "TRUE"})
1393         self.disallowDNFilter = True
1394
1395         self.l.add({"dn": "@INDEXLIST",
1396                     "@IDXATTR": [b"x", b"y", b"ou"]})
1397         self.IDX = True
1398
1399
1400 class IndexedAndOneLevelSearchTests(SearchTests):
1401     """Test searches using the index including @IDXONE, to ensure
1402        the index doesn't break things"""
1403
1404     def setUp(self):
1405         super(IndexedAndOneLevelSearchTests, self).setUp()
1406         self.l.add({"dn": "@INDEXLIST",
1407                     "@IDXATTR": [b"x", b"y", b"ou"],
1408                     "@IDXONE": [b"1"]})
1409         self.IDX = True
1410         self.IDXONE = True
1411
1412
1413 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1414     """Test searches using the index including @IDXONE, to ensure
1415        the index doesn't break things (full scan disabled)"""
1416
1417     def setUp(self):
1418         self.IDXCHECK = True
1419         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1420
1421
1422 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1423     """Test searches using the index including @IDXONE, to ensure
1424        the index doesn't break things"""
1425
1426     def setUp(self):
1427         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1428         self.l.add({"dn": "@OPTIONS",
1429                     "disallowDNFilter": "TRUE",
1430                     "checkBaseOnSearch": "TRUE"})
1431         self.disallowDNFilter = True
1432         self.checkBaseOnSearch = True
1433
1434         self.l.add({"dn": "@INDEXLIST",
1435                     "@IDXATTR": [b"x", b"y", b"ou"],
1436                     "@IDXONE": [b"1"]})
1437         self.IDX = True
1438         self.IDXONE = True
1439
1440
1441 class GUIDIndexedSearchTests(SearchTests):
1442     """Test searches using the index, to ensure the index doesn't
1443        break things"""
1444
1445     def setUp(self):
1446         self.index = {"dn": "@INDEXLIST",
1447                       "@IDXATTR": [b"x", b"y", b"ou"],
1448                       "@IDXGUID": [b"objectUUID"],
1449                       "@IDX_DN_GUID": [b"GUID"]}
1450         super(GUIDIndexedSearchTests, self).setUp()
1451
1452         self.IDXGUID = True
1453         self.IDXONE = True
1454
1455
1456 class GUIDIndexedDNFilterSearchTests(SearchTests):
1457     """Test searches using the index, to ensure the index doesn't
1458        break things"""
1459
1460     def setUp(self):
1461         self.index = {"dn": "@INDEXLIST",
1462                       "@IDXATTR": [b"x", b"y", b"ou"],
1463                       "@IDXGUID": [b"objectUUID"],
1464                       "@IDX_DN_GUID": [b"GUID"]}
1465         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1466         self.l.add({"dn": "@OPTIONS",
1467                     "disallowDNFilter": "TRUE",
1468                     "checkBaseOnSearch": "TRUE"})
1469         self.disallowDNFilter = True
1470         self.checkBaseOnSearch = True
1471         self.IDX = True
1472         self.IDXGUID = True
1473
1474
1475 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1476     """Test searches using the index including @IDXONE, to ensure
1477        the index doesn't break things"""
1478
1479     def setUp(self):
1480         self.index = {"dn": "@INDEXLIST",
1481                       "@IDXATTR": [b"x", b"y", b"ou"],
1482                       "@IDXGUID": [b"objectUUID"],
1483                       "@IDX_DN_GUID": [b"GUID"]}
1484         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1485         self.l.add({"dn": "@OPTIONS",
1486                     "disallowDNFilter": "TRUE",
1487                     "checkBaseOnSearch": "TRUE"})
1488         self.disallowDNFilter = True
1489         self.checkBaseOnSearch = True
1490         self.IDX = True
1491         self.IDXGUID = True
1492         self.IDXONE = True
1493
1494
1495 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1496
1497     def setUp(self):
1498         self.prefix = MDB_PREFIX
1499         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1500
1501     def tearDown(self):
1502         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1503
1504
1505 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1506
1507     def setUp(self):
1508         self.prefix = MDB_PREFIX
1509         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1510
1511     def tearDown(self):
1512         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1513
1514
1515 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1516
1517     def setUp(self):
1518         self.prefix = MDB_PREFIX
1519         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1520
1521     def tearDown(self):
1522         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1523
1524
1525 class AddModifyTests(LdbBaseTest):
1526     def tearDown(self):
1527         shutil.rmtree(self.testdir)
1528         super(AddModifyTests, self).tearDown()
1529
1530         # Ensure the LDB is closed now, so we close the FD
1531         del(self.l)
1532
1533     def setUp(self):
1534         super(AddModifyTests, self).setUp()
1535         self.testdir = tempdir()
1536         self.filename = os.path.join(self.testdir, "add_test.ldb")
1537         self.l = ldb.Ldb(self.url(),
1538                          flags=self.flags(),
1539                          options=["modules:rdn_name"])
1540         try:
1541             self.l.add(self.index)
1542         except AttributeError:
1543             pass
1544
1545         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1546                     "name": b"samba.org",
1547                     "objectUUID": b"0123456789abcdef"})
1548         self.l.add({"dn": "@ATTRIBUTES",
1549                     "objectUUID": "UNIQUE_INDEX"})
1550
1551     def test_add_dup(self):
1552         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1553                     "name": b"Admins",
1554                     "x": "z", "y": "a",
1555                     "objectUUID": b"0123456789abcde1"})
1556         try:
1557             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1558                         "name": b"Admins",
1559                         "x": "z", "y": "a",
1560                         "objectUUID": b"0123456789abcde2"})
1561             self.fail("Should have failed adding dupliate entry")
1562         except ldb.LdbError as err:
1563             enum = err.args[0]
1564             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1565
1566     def test_add_bad(self):
1567         try:
1568             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1569                         "name": b"Admins",
1570                         "x": "z", "y": "a",
1571                         "objectUUID": b"0123456789abcde1"})
1572             self.fail("Should have failed adding entry with invalid DN")
1573         except ldb.LdbError as err:
1574             enum = err.args[0]
1575             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1576
1577     def test_add_del_add(self):
1578         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1579                     "name": b"Admins",
1580                     "x": "z", "y": "a",
1581                     "objectUUID": b"0123456789abcde1"})
1582         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1583         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1584                     "name": b"Admins",
1585                     "x": "z", "y": "a",
1586                     "objectUUID": b"0123456789abcde2"})
1587
1588     def test_add_move_add(self):
1589         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1590                     "name": b"Admins",
1591                     "x": "z", "y": "a",
1592                     "objectUUID": b"0123456789abcde1"})
1593         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1594                       "OU=DUP2,DC=SAMBA,DC=ORG")
1595         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1596                     "name": b"Admins",
1597                     "x": "z", "y": "a",
1598                     "objectUUID": b"0123456789abcde2"})
1599
1600     def test_add_move_fail_move_move(self):
1601         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1602                     "name": b"Admins",
1603                     "x": "z", "y": "a",
1604                     "objectUUID": b"0123456789abcde1"})
1605         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1606                     "name": b"Admins",
1607                     "x": "z", "y": "a",
1608                     "objectUUID": b"0123456789abcde2"})
1609
1610         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1611                              scope=ldb.SCOPE_SUBTREE,
1612                              expression="(objectUUID=0123456789abcde1)")
1613         self.assertEqual(len(res2), 1)
1614         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1615
1616         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1617                              scope=ldb.SCOPE_SUBTREE,
1618                              expression="(objectUUID=0123456789abcde2)")
1619         self.assertEqual(len(res3), 1)
1620         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1621
1622         try:
1623             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1624                           "OU=DUP2,DC=SAMBA,DC=ORG")
1625             self.fail("Should have failed on duplicate DN")
1626         except ldb.LdbError as err:
1627             enum = err.args[0]
1628             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1629
1630         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1631                       "OU=DUP3,DC=SAMBA,DC=ORG")
1632
1633         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1634                       "OU=DUP2,DC=SAMBA,DC=ORG")
1635
1636         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1637                              scope=ldb.SCOPE_SUBTREE,
1638                              expression="(objectUUID=0123456789abcde1)")
1639         self.assertEqual(len(res2), 1)
1640         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1641
1642         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1643                              scope=ldb.SCOPE_SUBTREE,
1644                              expression="(objectUUID=0123456789abcde2)")
1645         self.assertEqual(len(res3), 1)
1646         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1647
1648     def test_move_missing(self):
1649         try:
1650             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1651                           "OU=DUP2,DC=SAMBA,DC=ORG")
1652             self.fail("Should have failed on missing")
1653         except ldb.LdbError as err:
1654             enum = err.args[0]
1655             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1656
1657     def test_move_missing2(self):
1658         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1659                     "name": b"Admins",
1660                     "x": "z", "y": "a",
1661                     "objectUUID": b"0123456789abcde2"})
1662
1663         try:
1664             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1665                           "OU=DUP2,DC=SAMBA,DC=ORG")
1666             self.fail("Should have failed on missing")
1667         except ldb.LdbError as err:
1668             enum = err.args[0]
1669             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1670
1671     def test_move_bad(self):
1672         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1673                     "name": b"Admins",
1674                     "x": "z", "y": "a",
1675                     "objectUUID": b"0123456789abcde2"})
1676
1677         try:
1678             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1679                           "OU=DUP2,DC=SAMBA,DC=ORG")
1680             self.fail("Should have failed on invalid DN")
1681         except ldb.LdbError as err:
1682             enum = err.args[0]
1683             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1684
1685     def test_move_bad2(self):
1686         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1687                     "name": b"Admins",
1688                     "x": "z", "y": "a",
1689                     "objectUUID": b"0123456789abcde2"})
1690
1691         try:
1692             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1693                           "OUXDUP2,DC=SAMBA,DC=ORG")
1694             self.fail("Should have failed on missing")
1695         except ldb.LdbError as err:
1696             enum = err.args[0]
1697             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1698
1699     def test_move_fail_move_add(self):
1700         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1701                     "name": b"Admins",
1702                     "x": "z", "y": "a",
1703                     "objectUUID": b"0123456789abcde1"})
1704         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1705                     "name": b"Admins",
1706                     "x": "z", "y": "a",
1707                     "objectUUID": b"0123456789abcde2"})
1708         try:
1709             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1710                           "OU=DUP2,DC=SAMBA,DC=ORG")
1711             self.fail("Should have failed on duplicate DN")
1712         except ldb.LdbError as err:
1713             enum = err.args[0]
1714             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1715
1716         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1717                       "OU=DUP3,DC=SAMBA,DC=ORG")
1718
1719         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1720                     "name": b"Admins",
1721                     "x": "z", "y": "a",
1722                     "objectUUID": b"0123456789abcde3"})
1723
1724
1725 class AddModifyTestsLmdb(AddModifyTests):
1726
1727     def setUp(self):
1728         self.prefix = MDB_PREFIX
1729         self.index = MDB_INDEX_OBJ
1730         super(AddModifyTestsLmdb, self).setUp()
1731
1732     def tearDown(self):
1733         super(AddModifyTestsLmdb, self).tearDown()
1734
1735
1736 class IndexedAddModifyTests(AddModifyTests):
1737     """Test searches using the index, to ensure the index doesn't
1738        break things"""
1739
1740     def setUp(self):
1741         if not hasattr(self, 'index'):
1742             self.index = {"dn": "@INDEXLIST",
1743                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1744                           "@IDXONE": [b"1"]}
1745         super(IndexedAddModifyTests, self).setUp()
1746
1747     def test_duplicate_GUID(self):
1748         try:
1749             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1750                         "name": b"Admins",
1751                         "x": "z", "y": "a",
1752                         "objectUUID": b"0123456789abcdef"})
1753             self.fail("Should have failed adding dupliate GUID")
1754         except ldb.LdbError as err:
1755             enum = err.args[0]
1756             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1757
1758     def test_duplicate_name_dup_GUID(self):
1759         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1760                     "name": b"Admins",
1761                     "x": "z", "y": "a",
1762                     "objectUUID": b"a123456789abcdef"})
1763         try:
1764             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1765                         "name": b"Admins",
1766                         "x": "z", "y": "a",
1767                         "objectUUID": b"a123456789abcdef"})
1768             self.fail("Should have failed adding dupliate GUID")
1769         except ldb.LdbError as err:
1770             enum = err.args[0]
1771             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1772
1773     def test_duplicate_name_dup_GUID2(self):
1774         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1775                     "name": b"Admins",
1776                     "x": "z", "y": "a",
1777                     "objectUUID": b"abc3456789abcdef"})
1778         try:
1779             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1780                         "name": b"Admins",
1781                         "x": "z", "y": "a",
1782                         "objectUUID": b"aaa3456789abcdef"})
1783             self.fail("Should have failed adding dupliate DN")
1784         except ldb.LdbError as err:
1785             enum = err.args[0]
1786             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1787
1788         # Checking the GUID didn't stick in the index
1789         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1790                     "name": b"Admins",
1791                     "x": "z", "y": "a",
1792                     "objectUUID": b"aaa3456789abcdef"})
1793
1794     def test_add_dup_guid_add(self):
1795         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1796                     "name": b"Admins",
1797                     "x": "z", "y": "a",
1798                     "objectUUID": b"0123456789abcde1"})
1799         try:
1800             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1801                         "name": b"Admins",
1802                         "x": "z", "y": "a",
1803                         "objectUUID": b"0123456789abcde1"})
1804             self.fail("Should have failed on duplicate GUID")
1805
1806         except ldb.LdbError as err:
1807             enum = err.args[0]
1808             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1809
1810         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1811                     "name": b"Admins",
1812                     "x": "z", "y": "a",
1813                     "objectUUID": b"0123456789abcde2"})
1814
1815
1816 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1817     """Test searches using the index, to ensure the index doesn't
1818        break things"""
1819
1820     def setUp(self):
1821         self.index = {"dn": "@INDEXLIST",
1822                       "@IDXATTR": [b"x", b"y", b"ou"],
1823                       "@IDXONE": [b"1"],
1824                       "@IDXGUID": [b"objectUUID"],
1825                       "@IDX_DN_GUID": [b"GUID"]}
1826         super(GUIDIndexedAddModifyTests, self).setUp()
1827
1828
1829 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1830     """Test GUID index behaviour insdie the transaction"""
1831
1832     def setUp(self):
1833         super(GUIDTransIndexedAddModifyTests, self).setUp()
1834         self.l.transaction_start()
1835
1836     def tearDown(self):
1837         self.l.transaction_commit()
1838         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1839
1840
1841 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1842     """Test index behaviour insdie the transaction"""
1843
1844     def setUp(self):
1845         super(TransIndexedAddModifyTests, self).setUp()
1846         self.l.transaction_start()
1847
1848     def tearDown(self):
1849         self.l.transaction_commit()
1850         super(TransIndexedAddModifyTests, self).tearDown()
1851
1852
1853 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1854
1855     def setUp(self):
1856         self.prefix = MDB_PREFIX
1857         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1858
1859     def tearDown(self):
1860         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1861
1862
1863 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1864
1865     def setUp(self):
1866         self.prefix = MDB_PREFIX
1867         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1868
1869     def tearDown(self):
1870         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1871
1872
1873 class BadIndexTests(LdbBaseTest):
1874     def setUp(self):
1875         super(BadIndexTests, self).setUp()
1876         self.testdir = tempdir()
1877         self.filename = os.path.join(self.testdir, "test.ldb")
1878         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1879         if hasattr(self, 'IDXGUID'):
1880             self.ldb.add({"dn": "@INDEXLIST",
1881                           "@IDXATTR": [b"x", b"y", b"ou"],
1882                           "@IDXGUID": [b"objectUUID"],
1883                           "@IDX_DN_GUID": [b"GUID"]})
1884         else:
1885             self.ldb.add({"dn": "@INDEXLIST",
1886                           "@IDXATTR": [b"x", b"y", b"ou"]})
1887
1888         super(BadIndexTests, self).setUp()
1889
1890     def test_unique(self):
1891         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1892                       "objectUUID": b"0123456789abcde1",
1893                       "y": "1"})
1894         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1895                       "objectUUID": b"0123456789abcde2",
1896                       "y": "1"})
1897         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1898                       "objectUUID": b"0123456789abcde3",
1899                       "y": "1"})
1900
1901         res = self.ldb.search(expression="(y=1)",
1902                               base="dc=samba,dc=org")
1903         self.assertEquals(len(res), 3)
1904
1905         # Now set this to unique index, but forget to check the result
1906         try:
1907             self.ldb.add({"dn": "@ATTRIBUTES",
1908                           "y": "UNIQUE_INDEX"})
1909             self.fail()
1910         except ldb.LdbError:
1911             pass
1912
1913         # We must still have a working index
1914         res = self.ldb.search(expression="(y=1)",
1915                               base="dc=samba,dc=org")
1916         self.assertEquals(len(res), 3)
1917
1918     def test_unique_transaction(self):
1919         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1920                       "objectUUID": b"0123456789abcde1",
1921                       "y": "1"})
1922         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1923                       "objectUUID": b"0123456789abcde2",
1924                       "y": "1"})
1925         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1926                       "objectUUID": b"0123456789abcde3",
1927                       "y": "1"})
1928
1929         res = self.ldb.search(expression="(y=1)",
1930                               base="dc=samba,dc=org")
1931         self.assertEquals(len(res), 3)
1932
1933         self.ldb.transaction_start()
1934
1935         # Now set this to unique index, but forget to check the result
1936         try:
1937             self.ldb.add({"dn": "@ATTRIBUTES",
1938                           "y": "UNIQUE_INDEX"})
1939         except ldb.LdbError:
1940             pass
1941
1942         try:
1943             self.ldb.transaction_commit()
1944             self.fail()
1945
1946         except ldb.LdbError as err:
1947             enum = err.args[0]
1948             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1949
1950         # We must still have a working index
1951         res = self.ldb.search(expression="(y=1)",
1952                               base="dc=samba,dc=org")
1953
1954         self.assertEquals(len(res), 3)
1955
1956     def test_casefold(self):
1957         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1958                       "objectUUID": b"0123456789abcde1",
1959                       "y": "a"})
1960         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1961                       "objectUUID": b"0123456789abcde2",
1962                       "y": "A"})
1963         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1964                       "objectUUID": b"0123456789abcde3",
1965                       "y": ["a", "A"]})
1966
1967         res = self.ldb.search(expression="(y=a)",
1968                               base="dc=samba,dc=org")
1969         self.assertEquals(len(res), 2)
1970
1971         self.ldb.add({"dn": "@ATTRIBUTES",
1972                       "y": "CASE_INSENSITIVE"})
1973
1974         # We must still have a working index
1975         res = self.ldb.search(expression="(y=a)",
1976                               base="dc=samba,dc=org")
1977
1978         if hasattr(self, 'IDXGUID'):
1979             self.assertEquals(len(res), 3)
1980         else:
1981             # We should not return this entry twice, but sadly
1982             # we have not yet fixed
1983             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1984             self.assertEquals(len(res), 4)
1985
1986     def test_casefold_transaction(self):
1987         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1988                       "objectUUID": b"0123456789abcde1",
1989                       "y": "a"})
1990         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1991                       "objectUUID": b"0123456789abcde2",
1992                       "y": "A"})
1993         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1994                       "objectUUID": b"0123456789abcde3",
1995                       "y": ["a", "A"]})
1996
1997         res = self.ldb.search(expression="(y=a)",
1998                               base="dc=samba,dc=org")
1999         self.assertEquals(len(res), 2)
2000
2001         self.ldb.transaction_start()
2002
2003         self.ldb.add({"dn": "@ATTRIBUTES",
2004                       "y": "CASE_INSENSITIVE"})
2005
2006         self.ldb.transaction_commit()
2007
2008         # We must still have a working index
2009         res = self.ldb.search(expression="(y=a)",
2010                               base="dc=samba,dc=org")
2011
2012         if hasattr(self, 'IDXGUID'):
2013             self.assertEquals(len(res), 3)
2014         else:
2015             # We should not return this entry twice, but sadly
2016             # we have not yet fixed
2017             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2018             self.assertEquals(len(res), 4)
2019
2020     def tearDown(self):
2021         super(BadIndexTests, self).tearDown()
2022
2023
2024 class GUIDBadIndexTests(BadIndexTests):
2025     """Test Bad index things with GUID index mode"""
2026
2027     def setUp(self):
2028         self.IDXGUID = True
2029
2030         super(GUIDBadIndexTests, self).setUp()
2031
2032
2033 class DnTests(TestCase):
2034
2035     def setUp(self):
2036         super(DnTests, self).setUp()
2037         self.ldb = ldb.Ldb()
2038
2039     def tearDown(self):
2040         super(DnTests, self).tearDown()
2041         del(self.ldb)
2042
2043     def test_set_dn_invalid(self):
2044         x = ldb.Message()
2045
2046         def assign():
2047             x.dn = "astring"
2048         self.assertRaises(TypeError, assign)
2049
2050     def test_eq(self):
2051         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2052         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2053         self.assertEqual(x, y)
2054         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2055         self.assertNotEqual(x, y)
2056
2057     def test_str(self):
2058         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2059         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2060
2061     def test_repr(self):
2062         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2063         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2064
2065     def test_get_casefold(self):
2066         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2067         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2068
2069     def test_validate(self):
2070         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2071         self.assertTrue(x.validate())
2072
2073     def test_parent(self):
2074         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2075         self.assertEqual("bar=bloe", x.parent().__str__())
2076
2077     def test_parent_nonexistent(self):
2078         x = ldb.Dn(self.ldb, "@BLA")
2079         self.assertEqual(None, x.parent())
2080
2081     def test_is_valid(self):
2082         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2083         self.assertTrue(x.is_valid())
2084         x = ldb.Dn(self.ldb, "")
2085         self.assertTrue(x.is_valid())
2086
2087     def test_is_special(self):
2088         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2089         self.assertFalse(x.is_special())
2090         x = ldb.Dn(self.ldb, "@FOOBAR")
2091         self.assertTrue(x.is_special())
2092
2093     def test_check_special(self):
2094         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2095         self.assertFalse(x.check_special("FOOBAR"))
2096         x = ldb.Dn(self.ldb, "@FOOBAR")
2097         self.assertTrue(x.check_special("@FOOBAR"))
2098
2099     def test_len(self):
2100         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2101         self.assertEqual(2, len(x))
2102         x = ldb.Dn(self.ldb, "dc=foo21")
2103         self.assertEqual(1, len(x))
2104
2105     def test_add_child(self):
2106         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2107         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2108         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2109
2110     def test_add_base(self):
2111         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2112         base = ldb.Dn(self.ldb, "bla=bloe")
2113         self.assertTrue(x.add_base(base))
2114         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2115
2116     def test_add_child_str(self):
2117         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2118         self.assertTrue(x.add_child("bla=bloe"))
2119         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2120
2121     def test_add_base_str(self):
2122         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2123         base = "bla=bloe"
2124         self.assertTrue(x.add_base(base))
2125         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2126
2127     def test_add(self):
2128         x = ldb.Dn(self.ldb, "dc=foo24")
2129         y = ldb.Dn(self.ldb, "bar=bla")
2130         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2131
2132     def test_remove_base_components(self):
2133         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2134         x.remove_base_components(len(x) - 1)
2135         self.assertEqual("dc=foo24", str(x))
2136
2137     def test_parse_ldif(self):
2138         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2139         msg = next(msgs)
2140         self.assertEqual("foo=bar", str(msg[1].dn))
2141         self.assertTrue(isinstance(msg[1], ldb.Message))
2142         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2143         self.assertEqual("dn: foo=bar\n\n", ldif)
2144
2145     def test_parse_ldif_more(self):
2146         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2147         msg = next(msgs)
2148         self.assertEqual("foo=bar", str(msg[1].dn))
2149         msg = next(msgs)
2150         self.assertEqual("bar=bar", str(msg[1].dn))
2151
2152     def test_canonical_string(self):
2153         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2154         self.assertEqual("/bloe/foo25", x.canonical_str())
2155
2156     def test_canonical_ex_string(self):
2157         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2158         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2159
2160     def test_ldb_is_child_of(self):
2161         """Testing ldb_dn_compare_dn"""
2162         dn1 = ldb.Dn(self.ldb, "dc=base")
2163         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2164         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2165         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2166
2167         self.assertTrue(dn1.is_child_of(dn1))
2168         self.assertTrue(dn2.is_child_of(dn1))
2169         self.assertTrue(dn4.is_child_of(dn1))
2170         self.assertTrue(dn4.is_child_of(dn3))
2171         self.assertTrue(dn4.is_child_of(dn4))
2172         self.assertFalse(dn3.is_child_of(dn2))
2173         self.assertFalse(dn1.is_child_of(dn4))
2174
2175     def test_ldb_is_child_of_str(self):
2176         """Testing ldb_dn_compare_dn"""
2177         dn1_str = "dc=base"
2178         dn2_str = "cn=foo,dc=base"
2179         dn3_str = "cn=bar,dc=base"
2180         dn4_str = "cn=baz,cn=bar,dc=base"
2181
2182         dn1 = ldb.Dn(self.ldb, dn1_str)
2183         dn2 = ldb.Dn(self.ldb, dn2_str)
2184         dn3 = ldb.Dn(self.ldb, dn3_str)
2185         dn4 = ldb.Dn(self.ldb, dn4_str)
2186
2187         self.assertTrue(dn1.is_child_of(dn1_str))
2188         self.assertTrue(dn2.is_child_of(dn1_str))
2189         self.assertTrue(dn4.is_child_of(dn1_str))
2190         self.assertTrue(dn4.is_child_of(dn3_str))
2191         self.assertTrue(dn4.is_child_of(dn4_str))
2192         self.assertFalse(dn3.is_child_of(dn2_str))
2193         self.assertFalse(dn1.is_child_of(dn4_str))
2194
2195     def test_get_component_name(self):
2196         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2197         self.assertEqual(dn.get_component_name(0), 'cn')
2198         self.assertEqual(dn.get_component_name(1), 'dc')
2199         self.assertEqual(dn.get_component_name(2), None)
2200         self.assertEqual(dn.get_component_name(-1), None)
2201
2202     def test_get_component_value(self):
2203         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2204         self.assertEqual(dn.get_component_value(0), 'foo')
2205         self.assertEqual(dn.get_component_value(1), 'base')
2206         self.assertEqual(dn.get_component_name(2), None)
2207         self.assertEqual(dn.get_component_name(-1), None)
2208
2209     def test_set_component(self):
2210         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2211         dn.set_component(0, 'cn', 'bar')
2212         self.assertEqual(str(dn), "cn=bar,dc=base")
2213         dn.set_component(1, 'o', 'asep')
2214         self.assertEqual(str(dn), "cn=bar,o=asep")
2215         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2216         self.assertEqual(str(dn), "cn=bar,o=asep")
2217         dn.set_component(1, 'o', 'a,b+c')
2218         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2219
2220     def test_set_component_bytes(self):
2221         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2222         dn.set_component(0, 'cn', b'bar')
2223         self.assertEqual(str(dn), "cn=bar,dc=base")
2224         dn.set_component(1, 'o', b'asep')
2225         self.assertEqual(str(dn), "cn=bar,o=asep")
2226
2227     def test_set_component_none(self):
2228         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2229         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2230
2231     def test_get_extended_component_null(self):
2232         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2233         self.assertEqual(dn.get_extended_component("TEST"), None)
2234
2235     def test_get_extended_component(self):
2236         self.ldb._register_test_extensions()
2237         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2238         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2239
2240     def test_set_extended_component(self):
2241         self.ldb._register_test_extensions()
2242         dn = ldb.Dn(self.ldb, "dc=base")
2243         dn.set_extended_component("TEST", "foo")
2244         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2245         dn.set_extended_component("TEST", b"bar")
2246         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2247
2248     def test_extended_str(self):
2249         self.ldb._register_test_extensions()
2250         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2251         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2252
2253     def test_get_rdn_name(self):
2254         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2255         self.assertEqual(dn.get_rdn_name(), 'cn')
2256
2257     def test_get_rdn_value(self):
2258         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2259         self.assertEqual(dn.get_rdn_value(), 'foo')
2260
2261     def test_get_casefold(self):
2262         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2263         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2264
2265     def test_get_linearized(self):
2266         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2267         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2268
2269     def test_is_null(self):
2270         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2271         self.assertFalse(dn.is_null())
2272
2273         dn = ldb.Dn(self.ldb, '')
2274         self.assertTrue(dn.is_null())
2275
2276
2277 class LdbMsgTests(TestCase):
2278
2279     def setUp(self):
2280         super(LdbMsgTests, self).setUp()
2281         self.msg = ldb.Message()
2282
2283     def test_init_dn(self):
2284         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2285         self.assertEqual("dc=foo27", str(self.msg.dn))
2286
2287     def test_iter_items(self):
2288         self.assertEqual(0, len(self.msg.items()))
2289         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2290         self.assertEqual(1, len(self.msg.items()))
2291
2292     def test_repr(self):
2293         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2294         self.msg["dc"] = b"foo"
2295         if PY3:
2296             self.assertIn(repr(self.msg), [
2297                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2298                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2299             ])
2300             self.assertIn(repr(self.msg.text), [
2301                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2302                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2303             ])
2304         else:
2305             self.assertEqual(
2306                 repr(self.msg),
2307                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
2308             self.assertEqual(
2309                 repr(self.msg.text),
2310                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
2311
2312     def test_len(self):
2313         self.assertEqual(0, len(self.msg))
2314
2315     def test_notpresent(self):
2316         self.assertRaises(KeyError, lambda: self.msg["foo"])
2317
2318     def test_del(self):
2319         del self.msg["foo"]
2320
2321     def test_add(self):
2322         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2323
2324     def test_add_text(self):
2325         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2326
2327     def test_elements_empty(self):
2328         self.assertEqual([], self.msg.elements())
2329
2330     def test_elements(self):
2331         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2332         self.msg.add(el)
2333         self.assertEqual([el], self.msg.elements())
2334         self.assertEqual([el.text], self.msg.text.elements())
2335
2336     def test_add_value(self):
2337         self.assertEqual(0, len(self.msg))
2338         self.msg["foo"] = [b"foo"]
2339         self.assertEqual(1, len(self.msg))
2340
2341     def test_add_value_text(self):
2342         self.assertEqual(0, len(self.msg))
2343         self.msg["foo"] = ["foo"]
2344         self.assertEqual(1, len(self.msg))
2345
2346     def test_add_value_multiple(self):
2347         self.assertEqual(0, len(self.msg))
2348         self.msg["foo"] = [b"foo", b"bla"]
2349         self.assertEqual(1, len(self.msg))
2350         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2351
2352     def test_add_value_multiple_text(self):
2353         self.assertEqual(0, len(self.msg))
2354         self.msg["foo"] = ["foo", "bla"]
2355         self.assertEqual(1, len(self.msg))
2356         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2357
2358     def test_set_value(self):
2359         self.msg["foo"] = [b"fool"]
2360         self.assertEqual([b"fool"], list(self.msg["foo"]))
2361         self.msg["foo"] = [b"bar"]
2362         self.assertEqual([b"bar"], list(self.msg["foo"]))
2363
2364     def test_set_value_text(self):
2365         self.msg["foo"] = ["fool"]
2366         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2367         self.msg["foo"] = ["bar"]
2368         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2369
2370     def test_keys(self):
2371         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2372         self.msg["foo"] = [b"bla"]
2373         self.msg["bar"] = [b"bla"]
2374         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2375
2376     def test_keys_text(self):
2377         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2378         self.msg["foo"] = ["bla"]
2379         self.msg["bar"] = ["bla"]
2380         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2381
2382     def test_dn(self):
2383         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2384         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2385
2386     def test_get_dn(self):
2387         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2388         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2389
2390     def test_dn_text(self):
2391         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2392         self.assertEqual("@BASEINFO", str(self.msg.dn))
2393         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2394
2395     def test_get_dn_text(self):
2396         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2397         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2398         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2399
2400     def test_get_invalid(self):
2401         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2402         self.assertRaises(TypeError, self.msg.get, 42)
2403
2404     def test_get_other(self):
2405         self.msg["foo"] = [b"bar"]
2406         self.assertEqual(b"bar", self.msg.get("foo")[0])
2407         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2408         self.assertEqual(None, self.msg.get("foo", idx=1))
2409         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2410
2411     def test_get_other_text(self):
2412         self.msg["foo"] = ["bar"]
2413         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2414         self.assertEqual("bar", self.msg.text.get("foo")[0])
2415         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2416         self.assertEqual(None, self.msg.get("foo", idx=1))
2417         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2418
2419     def test_get_default(self):
2420         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2421         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2422
2423     def test_get_default_text(self):
2424         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2425         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2426
2427     def test_get_unknown(self):
2428         self.assertEqual(None, self.msg.get("lalalala"))
2429
2430     def test_get_unknown_text(self):
2431         self.assertEqual(None, self.msg.text.get("lalalala"))
2432
2433     def test_msg_diff(self):
2434         l = ldb.Ldb()
2435         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2436         msg1 = next(msgs)[1]
2437         msg2 = next(msgs)[1]
2438         msgdiff = l.msg_diff(msg1, msg2)
2439         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2440         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2441         self.assertEqual(1, len(msgdiff))
2442
2443     def test_equal_empty(self):
2444         msg1 = ldb.Message()
2445         msg2 = ldb.Message()
2446         self.assertEqual(msg1, msg2)
2447
2448     def test_equal_simplel(self):
2449         db = ldb.Ldb()
2450         msg1 = ldb.Message()
2451         msg1.dn = ldb.Dn(db, "foo=bar")
2452         msg2 = ldb.Message()
2453         msg2.dn = ldb.Dn(db, "foo=bar")
2454         self.assertEqual(msg1, msg2)
2455         msg1['foo'] = b'bar'
2456         msg2['foo'] = b'bar'
2457         self.assertEqual(msg1, msg2)
2458         msg2['foo'] = b'blie'
2459         self.assertNotEqual(msg1, msg2)
2460         msg2['foo'] = b'blie'
2461
2462     def test_from_dict(self):
2463         rec = {"dn": "dc=fromdict",
2464                "a1": [b"a1-val1", b"a1-val1"]}
2465         l = ldb.Ldb()
2466         # check different types of input Flags
2467         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2468             m = ldb.Message.from_dict(l, rec, flags)
2469             self.assertEqual(rec["a1"], list(m["a1"]))
2470             self.assertEqual(flags, m["a1"].flags())
2471         # check input params
2472         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2473         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2474         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2475         # Message.from_dict expects dictionary with 'dn'
2476         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2477         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2478
2479     def test_from_dict_text(self):
2480         rec = {"dn": "dc=fromdict",
2481                "a1": ["a1-val1", "a1-val1"]}
2482         l = ldb.Ldb()
2483         # check different types of input Flags
2484         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2485             m = ldb.Message.from_dict(l, rec, flags)
2486             self.assertEqual(rec["a1"], list(m.text["a1"]))
2487             self.assertEqual(flags, m.text["a1"].flags())
2488         # check input params
2489         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2490         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2491         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2492         # Message.from_dict expects dictionary with 'dn'
2493         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2494         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2495
2496     def test_copy_add_message_element(self):
2497         m = ldb.Message()
2498         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2499         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2500         mto = ldb.Message()
2501         mto["1"] = m["1"]
2502         mto["2"] = m["2"]
2503         self.assertEqual(mto["1"], m["1"])
2504         self.assertEqual(mto["2"], m["2"])
2505         mto = ldb.Message()
2506         mto.add(m["1"])
2507         mto.add(m["2"])
2508         self.assertEqual(mto["1"], m["1"])
2509         self.assertEqual(mto["2"], m["2"])
2510
2511     def test_copy_add_message_element_text(self):
2512         m = ldb.Message()
2513         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2514         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2515         mto = ldb.Message()
2516         mto["1"] = m["1"]
2517         mto["2"] = m["2"]
2518         self.assertEqual(mto["1"], m.text["1"])
2519         self.assertEqual(mto["2"], m.text["2"])
2520         mto = ldb.Message()
2521         mto.add(m["1"])
2522         mto.add(m["2"])
2523         self.assertEqual(mto.text["1"], m.text["1"])
2524         self.assertEqual(mto.text["2"], m.text["2"])
2525         self.assertEqual(mto["1"], m["1"])
2526         self.assertEqual(mto["2"], m["2"])
2527
2528
2529 class MessageElementTests(TestCase):
2530
2531     def test_cmp_element(self):
2532         x = ldb.MessageElement([b"foo"])
2533         y = ldb.MessageElement([b"foo"])
2534         z = ldb.MessageElement([b"bzr"])
2535         self.assertEqual(x, y)
2536         self.assertNotEqual(x, z)
2537
2538     def test_cmp_element_text(self):
2539         x = ldb.MessageElement([b"foo"])
2540         y = ldb.MessageElement(["foo"])
2541         self.assertEqual(x, y)
2542
2543     def test_create_iterable(self):
2544         x = ldb.MessageElement([b"foo"])
2545         self.assertEqual([b"foo"], list(x))
2546         self.assertEqual(["foo"], list(x.text))
2547
2548     def test_repr(self):
2549         x = ldb.MessageElement([b"foo"])
2550         if PY3:
2551             self.assertEqual("MessageElement([b'foo'])", repr(x))
2552             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2553         else:
2554             self.assertEqual("MessageElement(['foo'])", repr(x))
2555             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2556         x = ldb.MessageElement([b"foo", b"bla"])
2557         self.assertEqual(2, len(x))
2558         if PY3:
2559             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2560             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2561         else:
2562             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2563             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2564
2565     def test_get_item(self):
2566         x = ldb.MessageElement([b"foo", b"bar"])
2567         self.assertEqual(b"foo", x[0])
2568         self.assertEqual(b"bar", x[1])
2569         self.assertEqual(b"bar", x[-1])
2570         self.assertRaises(IndexError, lambda: x[45])
2571
2572     def test_get_item_text(self):
2573         x = ldb.MessageElement(["foo", "bar"])
2574         self.assertEqual("foo", x.text[0])
2575         self.assertEqual("bar", x.text[1])
2576         self.assertEqual("bar", x.text[-1])
2577         self.assertRaises(IndexError, lambda: x[45])
2578
2579     def test_len(self):
2580         x = ldb.MessageElement([b"foo", b"bar"])
2581         self.assertEqual(2, len(x))
2582
2583     def test_eq(self):
2584         x = ldb.MessageElement([b"foo", b"bar"])
2585         y = ldb.MessageElement([b"foo", b"bar"])
2586         self.assertEqual(y, x)
2587         x = ldb.MessageElement([b"foo"])
2588         self.assertNotEqual(y, x)
2589         y = ldb.MessageElement([b"foo"])
2590         self.assertEqual(y, x)
2591
2592     def test_extended(self):
2593         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2594         if PY3:
2595             self.assertEqual("MessageElement([b'456'])", repr(el))
2596             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2597         else:
2598             self.assertEqual("MessageElement(['456'])", repr(el))
2599             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2600
2601     def test_bad_text(self):
2602         el = ldb.MessageElement(b'\xba\xdd')
2603         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2604
2605
2606 class ModuleTests(TestCase):
2607
2608     def setUp(self):
2609         super(ModuleTests, self).setUp()
2610         self.testdir = tempdir()
2611         self.filename = os.path.join(self.testdir, "test.ldb")
2612         self.ldb = ldb.Ldb(self.filename)
2613
2614     def tearDown(self):
2615         shutil.rmtree(self.testdir)
2616         super(ModuleTests, self).setUp()
2617
2618     def test_register_module(self):
2619         class ExampleModule:
2620             name = "example"
2621         ldb.register_module(ExampleModule)
2622
2623     def test_use_module(self):
2624         ops = []
2625
2626         class ExampleModule:
2627             name = "bla"
2628
2629             def __init__(self, ldb, next):
2630                 ops.append("init")
2631                 self.next = next
2632
2633             def search(self, *args, **kwargs):
2634                 return self.next.search(*args, **kwargs)
2635
2636             def request(self, *args, **kwargs):
2637                 pass
2638
2639         ldb.register_module(ExampleModule)
2640         l = ldb.Ldb(self.filename)
2641         l.add({"dn": "@MODULES", "@LIST": "bla"})
2642         self.assertEqual([], ops)
2643         l = ldb.Ldb(self.filename)
2644         self.assertEqual(["init"], ops)
2645
2646
2647 class LdbResultTests(LdbBaseTest):
2648
2649     def setUp(self):
2650         super(LdbResultTests, self).setUp()
2651         self.testdir = tempdir()
2652         self.filename = os.path.join(self.testdir, "test.ldb")
2653         self.l = ldb.Ldb(self.url(), flags=self.flags())
2654         try:
2655             self.l.add(self.index)
2656         except AttributeError:
2657             pass
2658         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2659                     "objectUUID": b"0123456789abcde0"})
2660         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2661                     "objectUUID": b"0123456789abcde1"})
2662         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2663                     "objectUUID": b"0123456789abcde2"})
2664         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2665                     "objectUUID": b"0123456789abcde3"})
2666         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2667                     "objectUUID": b"0123456789abcde4"})
2668         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2669                     "objectUUID": b"0123456789abcde5"})
2670         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2671                     "objectUUID": b"0123456789abcde6"})
2672         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2673                     "objectUUID": b"0123456789abcde7"})
2674         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2675                     "objectUUID": b"0123456789abcde8"})
2676         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2677                     "objectUUID": b"0123456789abcde9"})
2678         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2679                     "objectUUID": b"0123456789abcdea"})
2680         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2681                     "objectUUID": b"0123456789abcdeb"})
2682         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2683                     "objectUUID": b"0123456789abcdec"})
2684
2685     def tearDown(self):
2686         shutil.rmtree(self.testdir)
2687         super(LdbResultTests, self).tearDown()
2688         # Ensure the LDB is closed now, so we close the FD
2689         del(self.l)
2690
2691     def test_return_type(self):
2692         res = self.l.search()
2693         self.assertEqual(str(res), "<ldb result>")
2694
2695     def test_get_msgs(self):
2696         res = self.l.search()
2697         list = res.msgs
2698
2699     def test_get_controls(self):
2700         res = self.l.search()
2701         list = res.controls
2702
2703     def test_get_referals(self):
2704         res = self.l.search()
2705         list = res.referals
2706
2707     def test_iter_msgs(self):
2708         found = False
2709         for l in self.l.search().msgs:
2710             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2711                 found = True
2712         self.assertTrue(found)
2713
2714     def test_iter_msgs_count(self):
2715         self.assertTrue(self.l.search().count > 0)
2716         # 13 objects has been added to the DC=SAMBA, DC=ORG
2717         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2718
2719     def test_iter_controls(self):
2720         res = self.l.search().controls
2721         it = iter(res)
2722
2723     def test_create_control(self):
2724         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2725         c = ldb.Control(self.l, "relax:1")
2726         self.assertEqual(c.critical, True)
2727         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2728
2729     def test_iter_refs(self):
2730         res = self.l.search().referals
2731         it = iter(res)
2732
2733     def test_search_sequence_msgs(self):
2734         found = False
2735         res = self.l.search().msgs
2736
2737         for i in range(0, len(res)):
2738             l = res[i]
2739             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2740                 found = True
2741         self.assertTrue(found)
2742
2743     def test_search_as_iter(self):
2744         found = False
2745         res = self.l.search()
2746
2747         for l in res:
2748             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2749                 found = True
2750         self.assertTrue(found)
2751
2752     def test_search_iter(self):
2753         found = False
2754         res = self.l.search_iterator()
2755
2756         for l in res:
2757             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2758                 found = True
2759         self.assertTrue(found)
2760
2761     # Show that search results can't see into a transaction
2762
2763     def test_search_against_trans(self):
2764         found11 = False
2765
2766         (r1, w1) = os.pipe()
2767
2768         (r2, w2) = os.pipe()
2769
2770         # For the first element, fork a child that will
2771         # write to the DB
2772         pid = os.fork()
2773         if pid == 0:
2774             # In the child, re-open
2775             del(self.l)
2776             gc.collect()
2777
2778             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2779             # start a transaction
2780             child_ldb.transaction_start()
2781
2782             # write to it
2783             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2784                            "name": b"samba.org",
2785                            "objectUUID": b"o123456789acbdef"})
2786
2787             os.write(w1, b"added")
2788
2789             # Now wait for the search to be done
2790             os.read(r2, 6)
2791
2792             # and commit
2793             try:
2794                 child_ldb.transaction_commit()
2795             except LdbError as err:
2796                 # We print this here to see what went wrong in the child
2797                 print(err)
2798                 os._exit(1)
2799
2800             os.write(w1, b"transaction")
2801             os._exit(0)
2802
2803         self.assertEqual(os.read(r1, 5), b"added")
2804
2805         # This should not turn up until the transaction is concluded
2806         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2807                               scope=ldb.SCOPE_BASE)
2808         self.assertEqual(len(res11), 0)
2809
2810         os.write(w2, b"search")
2811
2812         # Now wait for the transaction to be done.  This should
2813         # deadlock, but the search doesn't hold a read lock for the
2814         # iterator lifetime currently.
2815         self.assertEqual(os.read(r1, 11), b"transaction")
2816
2817         # This should now turn up, as the transaction is over
2818         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2819                               scope=ldb.SCOPE_BASE)
2820         self.assertEqual(len(res11), 1)
2821
2822         self.assertFalse(found11)
2823
2824         (got_pid, status) = os.waitpid(pid, 0)
2825         self.assertEqual(got_pid, pid)
2826
2827     def test_search_iter_against_trans(self):
2828         found = False
2829         found11 = False
2830
2831         # We need to hold this iterator open to hold the all-record
2832         # lock
2833         res = self.l.search_iterator()
2834
2835         (r1, w1) = os.pipe()
2836
2837         (r2, w2) = os.pipe()
2838
2839         # For the first element, with the sequence open (which
2840         # means with ldb locks held), fork a child that will
2841         # write to the DB
2842         pid = os.fork()
2843         if pid == 0:
2844             # In the child, re-open
2845             del(res)
2846             del(self.l)
2847             gc.collect()
2848
2849             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2850             # start a transaction
2851             child_ldb.transaction_start()
2852
2853             # write to it
2854             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2855                            "name": b"samba.org",
2856                            "objectUUID": b"o123456789acbdef"})
2857
2858             os.write(w1, b"added")
2859
2860             # Now wait for the search to be done
2861             os.read(r2, 6)
2862
2863             # and commit
2864             try:
2865                 child_ldb.transaction_commit()
2866             except LdbError as err:
2867                 # We print this here to see what went wrong in the child
2868                 print(err)
2869                 os._exit(1)
2870
2871             os.write(w1, b"transaction")
2872             os._exit(0)
2873
2874         self.assertEqual(os.read(r1, 5), b"added")
2875
2876         # This should not turn up until the transaction is concluded
2877         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2878                               scope=ldb.SCOPE_BASE)
2879         self.assertEqual(len(res11), 0)
2880
2881         os.write(w2, b"search")
2882
2883         # allow the transaction to start
2884         time.sleep(1)
2885
2886         # This should not turn up until the search finishes and
2887         # removed the read lock, but for ldb_tdb that happened as soon
2888         # as we called the first res.next()
2889         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2890                               scope=ldb.SCOPE_BASE)
2891         self.assertEqual(len(res11), 0)
2892
2893         # These results are all collected at the first next(res) call
2894         for l in res:
2895             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2896                 found = True
2897             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2898                 found11 = True
2899
2900         # Now wait for the transaction to be done.
2901         self.assertEqual(os.read(r1, 11), b"transaction")
2902
2903         # This should now turn up, as the transaction is over and all
2904         # read locks are gone
2905         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2906                               scope=ldb.SCOPE_BASE)
2907         self.assertEqual(len(res11), 1)
2908
2909         self.assertTrue(found)
2910         self.assertFalse(found11)
2911
2912         (got_pid, status) = os.waitpid(pid, 0)
2913         self.assertEqual(got_pid, pid)
2914
2915
2916 class LdbResultTestsLmdb(LdbResultTests):
2917
2918     def setUp(self):
2919         self.prefix = MDB_PREFIX
2920         self.index = MDB_INDEX_OBJ
2921         super(LdbResultTestsLmdb, self).setUp()
2922
2923     def tearDown(self):
2924         super(LdbResultTestsLmdb, self).tearDown()
2925
2926
2927 class BadTypeTests(TestCase):
2928     def test_control(self):
2929         l = ldb.Ldb()
2930         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2931         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2932
2933     def test_modify(self):
2934         l = ldb.Ldb()
2935         dn = ldb.Dn(l, 'a=b')
2936         m = ldb.Message(dn)
2937         self.assertRaises(TypeError, l.modify, '<bad type>')
2938         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2939
2940     def test_add(self):
2941         l = ldb.Ldb()
2942         dn = ldb.Dn(l, 'a=b')
2943         m = ldb.Message(dn)
2944         self.assertRaises(TypeError, l.add, '<bad type>')
2945         self.assertRaises(TypeError, l.add, m, '<bad type>')
2946
2947     def test_delete(self):
2948         l = ldb.Ldb()
2949         dn = ldb.Dn(l, 'a=b')
2950         self.assertRaises(TypeError, l.add, '<bad type>')
2951         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2952
2953     def test_rename(self):
2954         l = ldb.Ldb()
2955         dn = ldb.Dn(l, 'a=b')
2956         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2957         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2958         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2959
2960     def test_search(self):
2961         l = ldb.Ldb()
2962         self.assertRaises(TypeError, l.search, base=1234)
2963         self.assertRaises(TypeError, l.search, scope='<bad type>')
2964         self.assertRaises(TypeError, l.search, expression=1234)
2965         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2966         self.assertRaises(TypeError, l.search, controls='<bad type>')
2967
2968
2969 class VersionTests(TestCase):
2970
2971     def test_version(self):
2972         self.assertTrue(isinstance(ldb.__version__, str))
2973
2974
2975 if __name__ == '__main__':
2976     import unittest
2977     unittest.TestProgram()