ldb: The test api.py should not rely on order of entries in dict
[samba.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python3
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18 MDB_INDEX_OBJ = {
19     "dn": "@INDEXLIST",
20     "@IDXONE": [b"1"],
21     "@IDXGUID": [b"objectUUID"],
22     "@IDX_DN_GUID": [b"GUID"]
23 }
24
25
26 def tempdir():
27     import tempfile
28     try:
29         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
30     except KeyError:
31         dir_prefix = None
32     return tempfile.mkdtemp(dir=dir_prefix)
33
34
35 class NoContextTests(TestCase):
36
37     def test_valid_attr_name(self):
38         self.assertTrue(ldb.valid_attr_name("foo"))
39         self.assertFalse(ldb.valid_attr_name("24foo"))
40
41     def test_timestring(self):
42         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
43         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
44
45     def test_string_to_time(self):
46         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
47         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
48
49     def test_binary_encode(self):
50         encoded = ldb.binary_encode(b'test\\x')
51         decoded = ldb.binary_decode(encoded)
52         self.assertEqual(decoded, b'test\\x')
53
54         encoded2 = ldb.binary_encode('test\\x')
55         self.assertEqual(encoded2, encoded)
56
57
58 class LdbBaseTest(TestCase):
59     def setUp(self):
60         super(LdbBaseTest, self).setUp()
61         try:
62             if self.prefix is None:
63                 self.prefix = TDB_PREFIX
64         except AttributeError:
65             self.prefix = TDB_PREFIX
66
67     def tearDown(self):
68         super(LdbBaseTest, self).tearDown()
69
70     def url(self):
71         return self.prefix + self.filename
72
73     def flags(self):
74         if self.prefix == MDB_PREFIX:
75             return ldb.FLG_NOSYNC
76         else:
77             return 0
78
79
80 class SimpleLdb(LdbBaseTest):
81
82     def setUp(self):
83         super(SimpleLdb, self).setUp()
84         self.testdir = tempdir()
85         self.filename = os.path.join(self.testdir, "test.ldb")
86         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
87         try:
88             self.ldb.add(self.index)
89         except AttributeError:
90             pass
91
92     def tearDown(self):
93         shutil.rmtree(self.testdir)
94         super(SimpleLdb, self).tearDown()
95         # Ensure the LDB is closed now, so we close the FD
96         del(self.ldb)
97
98     def test_connect(self):
99         ldb.Ldb(self.url(), flags=self.flags())
100
101     def test_connect_none(self):
102         ldb.Ldb()
103
104     def test_connect_later(self):
105         x = ldb.Ldb()
106         x.connect(self.url(), flags=self.flags())
107
108     def test_repr(self):
109         x = ldb.Ldb()
110         self.assertTrue(repr(x).startswith("<ldb connection"))
111
112     def test_set_create_perms(self):
113         x = ldb.Ldb()
114         x.set_create_perms(0o600)
115
116     def test_modules_none(self):
117         x = ldb.Ldb()
118         self.assertEqual([], x.modules())
119
120     def test_modules_tdb(self):
121         x = ldb.Ldb(self.url(), flags=self.flags())
122         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
123
124     def test_firstmodule_none(self):
125         x = ldb.Ldb()
126         self.assertEqual(x.firstmodule, None)
127
128     def test_firstmodule_tdb(self):
129         x = ldb.Ldb(self.url(), flags=self.flags())
130         mod = x.firstmodule
131         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
132
133     def test_search(self):
134         l = ldb.Ldb(self.url(), flags=self.flags())
135         self.assertEqual(len(l.search()), 0)
136
137     def test_search_controls(self):
138         l = ldb.Ldb(self.url(), flags=self.flags())
139         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
140
141     def test_utf8_ldb_Dn(self):
142         l = ldb.Ldb(self.url(), flags=self.flags())
143         dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
144
145     def test_utf8_encoded_ldb_Dn(self):
146         l = ldb.Ldb(self.url(), flags=self.flags())
147         dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
148         try:
149             dn = ldb.Dn(l, dn_encoded_utf8)
150         except UnicodeDecodeError as e:
151                 raise
152         except TypeError as te:
153             if PY3:
154                p3errors = ["argument 2 must be str, not bytes",
155                            "Can't convert 'bytes' object to str implicitly"]
156                self.assertIn(str(te), p3errors)
157             else:
158                raise
159
160     def test_search_attrs(self):
161         l = ldb.Ldb(self.url(), flags=self.flags())
162         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
163
164     def test_search_string_dn(self):
165         l = ldb.Ldb(self.url(), flags=self.flags())
166         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
167
168     def test_search_attr_string(self):
169         l = ldb.Ldb(self.url(), flags=self.flags())
170         self.assertRaises(TypeError, l.search, attrs="dc")
171         self.assertRaises(TypeError, l.search, attrs=b"dc")
172
173     def test_opaque(self):
174         l = ldb.Ldb(self.url(), flags=self.flags())
175         l.set_opaque("my_opaque", l)
176         self.assertTrue(l.get_opaque("my_opaque") is not None)
177         self.assertEqual(None, l.get_opaque("unknown"))
178
179     def test_search_scope_base_empty_db(self):
180         l = ldb.Ldb(self.url(), flags=self.flags())
181         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
182                                       ldb.SCOPE_BASE)), 0)
183
184     def test_search_scope_onelevel_empty_db(self):
185         l = ldb.Ldb(self.url(), flags=self.flags())
186         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
187                                       ldb.SCOPE_ONELEVEL)), 0)
188
189     def test_delete(self):
190         l = ldb.Ldb(self.url(), flags=self.flags())
191         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
192
193     def test_delete_w_unhandled_ctrl(self):
194         l = ldb.Ldb(self.url(), flags=self.flags())
195         m = ldb.Message()
196         m.dn = ldb.Dn(l, "dc=foo1")
197         m["b"] = [b"a"]
198         m["objectUUID"] = b"0123456789abcdef"
199         l.add(m)
200         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
201         l.delete(m.dn)
202
203     def test_contains(self):
204         name = self.url()
205         l = ldb.Ldb(name, flags=self.flags())
206         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
207         l = ldb.Ldb(name, flags=self.flags())
208         m = ldb.Message()
209         m.dn = ldb.Dn(l, "dc=foo3")
210         m["b"] = ["a"]
211         m["objectUUID"] = b"0123456789abcdef"
212         l.add(m)
213         try:
214             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
215             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
216         finally:
217             l.delete(m.dn)
218
219     def test_get_config_basedn(self):
220         l = ldb.Ldb(self.url(), flags=self.flags())
221         self.assertEqual(None, l.get_config_basedn())
222
223     def test_get_root_basedn(self):
224         l = ldb.Ldb(self.url(), flags=self.flags())
225         self.assertEqual(None, l.get_root_basedn())
226
227     def test_get_schema_basedn(self):
228         l = ldb.Ldb(self.url(), flags=self.flags())
229         self.assertEqual(None, l.get_schema_basedn())
230
231     def test_get_default_basedn(self):
232         l = ldb.Ldb(self.url(), flags=self.flags())
233         self.assertEqual(None, l.get_default_basedn())
234
235     def test_add(self):
236         l = ldb.Ldb(self.url(), flags=self.flags())
237         m = ldb.Message()
238         m.dn = ldb.Dn(l, "dc=foo4")
239         m["bla"] = b"bla"
240         m["objectUUID"] = b"0123456789abcdef"
241         self.assertEqual(len(l.search()), 0)
242         l.add(m)
243         try:
244             self.assertEqual(len(l.search()), 1)
245         finally:
246             l.delete(ldb.Dn(l, "dc=foo4"))
247
248     def test_search_iterator(self):
249         l = ldb.Ldb(self.url(), flags=self.flags())
250         s = l.search_iterator()
251         s.abandon()
252         try:
253             for me in s:
254                 self.fail()
255             self.fail()
256         except RuntimeError as re:
257             pass
258         try:
259             s.abandon()
260             self.fail()
261         except RuntimeError as re:
262             pass
263         try:
264             s.result()
265             self.fail()
266         except RuntimeError as re:
267             pass
268
269         s = l.search_iterator()
270         count = 0
271         for me in s:
272             self.assertTrue(isinstance(me, ldb.Message))
273             count += 1
274         r = s.result()
275         self.assertEqual(len(r), 0)
276         self.assertEqual(count, 0)
277
278         m1 = ldb.Message()
279         m1.dn = ldb.Dn(l, "dc=foo4")
280         m1["bla"] = b"bla"
281         m1["objectUUID"] = b"0123456789abcdef"
282         l.add(m1)
283         try:
284             s = l.search_iterator()
285             msgs = []
286             for me in s:
287                 self.assertTrue(isinstance(me, ldb.Message))
288                 count += 1
289                 msgs.append(me)
290             r = s.result()
291             self.assertEqual(len(r), 0)
292             self.assertEqual(len(msgs), 1)
293             self.assertEqual(msgs[0].dn, m1.dn)
294
295             m2 = ldb.Message()
296             m2.dn = ldb.Dn(l, "dc=foo5")
297             m2["bla"] = b"bla"
298             m2["objectUUID"] = b"0123456789abcdee"
299             l.add(m2)
300
301             s = l.search_iterator()
302             msgs = []
303             for me in s:
304                 self.assertTrue(isinstance(me, ldb.Message))
305                 count += 1
306                 msgs.append(me)
307             r = s.result()
308             self.assertEqual(len(r), 0)
309             self.assertEqual(len(msgs), 2)
310             if msgs[0].dn == m1.dn:
311                 self.assertEqual(msgs[0].dn, m1.dn)
312                 self.assertEqual(msgs[1].dn, m2.dn)
313             else:
314                 self.assertEqual(msgs[0].dn, m2.dn)
315                 self.assertEqual(msgs[1].dn, m1.dn)
316
317             s = l.search_iterator()
318             msgs = []
319             for me in s:
320                 self.assertTrue(isinstance(me, ldb.Message))
321                 count += 1
322                 msgs.append(me)
323                 break
324             try:
325                 s.result()
326                 self.fail()
327             except RuntimeError as re:
328                 pass
329             for me in s:
330                 self.assertTrue(isinstance(me, ldb.Message))
331                 count += 1
332                 msgs.append(me)
333                 break
334             for me in s:
335                 self.fail()
336
337             r = s.result()
338             self.assertEqual(len(r), 0)
339             self.assertEqual(len(msgs), 2)
340             if msgs[0].dn == m1.dn:
341                 self.assertEqual(msgs[0].dn, m1.dn)
342                 self.assertEqual(msgs[1].dn, m2.dn)
343             else:
344                 self.assertEqual(msgs[0].dn, m2.dn)
345                 self.assertEqual(msgs[1].dn, m1.dn)
346         finally:
347             l.delete(ldb.Dn(l, "dc=foo4"))
348             l.delete(ldb.Dn(l, "dc=foo5"))
349
350     def test_add_text(self):
351         l = ldb.Ldb(self.url(), flags=self.flags())
352         m = ldb.Message()
353         m.dn = ldb.Dn(l, "dc=foo4")
354         m["bla"] = "bla"
355         m["objectUUID"] = b"0123456789abcdef"
356         self.assertEqual(len(l.search()), 0)
357         l.add(m)
358         try:
359             self.assertEqual(len(l.search()), 1)
360         finally:
361             l.delete(ldb.Dn(l, "dc=foo4"))
362
363     def test_add_w_unhandled_ctrl(self):
364         l = ldb.Ldb(self.url(), flags=self.flags())
365         m = ldb.Message()
366         m.dn = ldb.Dn(l, "dc=foo4")
367         m["bla"] = b"bla"
368         self.assertEqual(len(l.search()), 0)
369         self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
370
371     def test_add_dict(self):
372         l = ldb.Ldb(self.url(), flags=self.flags())
373         m = {"dn": ldb.Dn(l, "dc=foo5"),
374              "bla": b"bla",
375              "objectUUID": b"0123456789abcdef"}
376         self.assertEqual(len(l.search()), 0)
377         l.add(m)
378         try:
379             self.assertEqual(len(l.search()), 1)
380         finally:
381             l.delete(ldb.Dn(l, "dc=foo5"))
382
383     def test_add_dict_text(self):
384         l = ldb.Ldb(self.url(), flags=self.flags())
385         m = {"dn": ldb.Dn(l, "dc=foo5"),
386              "bla": "bla",
387              "objectUUID": b"0123456789abcdef"}
388         self.assertEqual(len(l.search()), 0)
389         l.add(m)
390         try:
391             self.assertEqual(len(l.search()), 1)
392         finally:
393             l.delete(ldb.Dn(l, "dc=foo5"))
394
395     def test_add_dict_string_dn(self):
396         l = ldb.Ldb(self.url(), flags=self.flags())
397         m = {"dn": "dc=foo6", "bla": b"bla",
398              "objectUUID": b"0123456789abcdef"}
399         self.assertEqual(len(l.search()), 0)
400         l.add(m)
401         try:
402             self.assertEqual(len(l.search()), 1)
403         finally:
404             l.delete(ldb.Dn(l, "dc=foo6"))
405
406     def test_add_dict_bytes_dn(self):
407         l = ldb.Ldb(self.url(), flags=self.flags())
408         m = {"dn": b"dc=foo6", "bla": b"bla",
409              "objectUUID": b"0123456789abcdef"}
410         self.assertEqual(len(l.search()), 0)
411         l.add(m)
412         try:
413             self.assertEqual(len(l.search()), 1)
414         finally:
415             l.delete(ldb.Dn(l, "dc=foo6"))
416
417     def test_rename(self):
418         l = ldb.Ldb(self.url(), flags=self.flags())
419         m = ldb.Message()
420         m.dn = ldb.Dn(l, "dc=foo7")
421         m["bla"] = b"bla"
422         m["objectUUID"] = b"0123456789abcdef"
423         self.assertEqual(len(l.search()), 0)
424         l.add(m)
425         try:
426             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
427             self.assertEqual(len(l.search()), 1)
428         finally:
429             l.delete(ldb.Dn(l, "dc=bar"))
430
431     def test_rename_string_dns(self):
432         l = ldb.Ldb(self.url(), flags=self.flags())
433         m = ldb.Message()
434         m.dn = ldb.Dn(l, "dc=foo8")
435         m["bla"] = b"bla"
436         m["objectUUID"] = b"0123456789abcdef"
437         self.assertEqual(len(l.search()), 0)
438         l.add(m)
439         self.assertEqual(len(l.search()), 1)
440         try:
441             l.rename("dc=foo8", "dc=bar")
442             self.assertEqual(len(l.search()), 1)
443         finally:
444             l.delete(ldb.Dn(l, "dc=bar"))
445
446     def test_rename_bad_string_dns(self):
447         l = ldb.Ldb(self.url(), flags=self.flags())
448         m = ldb.Message()
449         m.dn = ldb.Dn(l, "dc=foo8")
450         m["bla"] = b"bla"
451         m["objectUUID"] = b"0123456789abcdef"
452         self.assertEqual(len(l.search()), 0)
453         l.add(m)
454         self.assertEqual(len(l.search()), 1)
455         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
456         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
457         l.delete(ldb.Dn(l, "dc=foo8"))
458
459     def test_empty_dn(self):
460         l = ldb.Ldb(self.url(), flags=self.flags())
461         self.assertEqual(0, len(l.search()))
462         m = ldb.Message()
463         m.dn = ldb.Dn(l, "dc=empty")
464         m["objectUUID"] = b"0123456789abcdef"
465         l.add(m)
466         rm = l.search()
467         self.assertEqual(1, len(rm))
468         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
469                          set(rm[0].keys()))
470
471         rm = l.search(m.dn)
472         self.assertEqual(1, len(rm))
473         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
474                          set(rm[0].keys()))
475         rm = l.search(m.dn, attrs=["blah"])
476         self.assertEqual(1, len(rm))
477         self.assertEqual(0, len(rm[0]))
478
479     def test_modify_delete(self):
480         l = ldb.Ldb(self.url(), flags=self.flags())
481         m = ldb.Message()
482         m.dn = ldb.Dn(l, "dc=modifydelete")
483         m["bla"] = [b"1234"]
484         m["objectUUID"] = b"0123456789abcdef"
485         l.add(m)
486         rm = l.search(m.dn)[0]
487         self.assertEqual([b"1234"], list(rm["bla"]))
488         try:
489             m = ldb.Message()
490             m.dn = ldb.Dn(l, "dc=modifydelete")
491             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
492             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
493             l.modify(m)
494             rm = l.search(m.dn)
495             self.assertEqual(1, len(rm))
496             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
497                              set(rm[0].keys()))
498             rm = l.search(m.dn, attrs=["bla"])
499             self.assertEqual(1, len(rm))
500             self.assertEqual(0, len(rm[0]))
501         finally:
502             l.delete(ldb.Dn(l, "dc=modifydelete"))
503
504     def test_modify_delete_text(self):
505         l = ldb.Ldb(self.url(), flags=self.flags())
506         m = ldb.Message()
507         m.dn = ldb.Dn(l, "dc=modifydelete")
508         m.text["bla"] = ["1234"]
509         m["objectUUID"] = b"0123456789abcdef"
510         l.add(m)
511         rm = l.search(m.dn)[0]
512         self.assertEqual(["1234"], list(rm.text["bla"]))
513         try:
514             m = ldb.Message()
515             m.dn = ldb.Dn(l, "dc=modifydelete")
516             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
517             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
518             l.modify(m)
519             rm = l.search(m.dn)
520             self.assertEqual(1, len(rm))
521             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
522                              set(rm[0].keys()))
523             rm = l.search(m.dn, attrs=["bla"])
524             self.assertEqual(1, len(rm))
525             self.assertEqual(0, len(rm[0]))
526         finally:
527             l.delete(ldb.Dn(l, "dc=modifydelete"))
528
529     def test_modify_add(self):
530         l = ldb.Ldb(self.url(), flags=self.flags())
531         m = ldb.Message()
532         m.dn = ldb.Dn(l, "dc=add")
533         m["bla"] = [b"1234"]
534         m["objectUUID"] = b"0123456789abcdef"
535         l.add(m)
536         try:
537             m = ldb.Message()
538             m.dn = ldb.Dn(l, "dc=add")
539             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
540             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
541             l.modify(m)
542             rm = l.search(m.dn)[0]
543             self.assertEqual(3, len(rm))
544             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
545         finally:
546             l.delete(ldb.Dn(l, "dc=add"))
547
548     def test_modify_add_text(self):
549         l = ldb.Ldb(self.url(), flags=self.flags())
550         m = ldb.Message()
551         m.dn = ldb.Dn(l, "dc=add")
552         m.text["bla"] = ["1234"]
553         m["objectUUID"] = b"0123456789abcdef"
554         l.add(m)
555         try:
556             m = ldb.Message()
557             m.dn = ldb.Dn(l, "dc=add")
558             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
559             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
560             l.modify(m)
561             rm = l.search(m.dn)[0]
562             self.assertEqual(3, len(rm))
563             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
564         finally:
565             l.delete(ldb.Dn(l, "dc=add"))
566
567     def test_modify_replace(self):
568         l = ldb.Ldb(self.url(), flags=self.flags())
569         m = ldb.Message()
570         m.dn = ldb.Dn(l, "dc=modify2")
571         m["bla"] = [b"1234", b"456"]
572         m["objectUUID"] = b"0123456789abcdef"
573         l.add(m)
574         try:
575             m = ldb.Message()
576             m.dn = ldb.Dn(l, "dc=modify2")
577             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
578             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
579             l.modify(m)
580             rm = l.search(m.dn)[0]
581             self.assertEqual(3, len(rm))
582             self.assertEqual([b"789"], list(rm["bla"]))
583             rm = l.search(m.dn, attrs=["bla"])[0]
584             self.assertEqual(1, len(rm))
585         finally:
586             l.delete(ldb.Dn(l, "dc=modify2"))
587
588     def test_modify_replace_text(self):
589         l = ldb.Ldb(self.url(), flags=self.flags())
590         m = ldb.Message()
591         m.dn = ldb.Dn(l, "dc=modify2")
592         m.text["bla"] = ["1234", "456"]
593         m["objectUUID"] = b"0123456789abcdef"
594         l.add(m)
595         try:
596             m = ldb.Message()
597             m.dn = ldb.Dn(l, "dc=modify2")
598             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
599             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
600             l.modify(m)
601             rm = l.search(m.dn)[0]
602             self.assertEqual(3, len(rm))
603             self.assertEqual(["789"], list(rm.text["bla"]))
604             rm = l.search(m.dn, attrs=["bla"])[0]
605             self.assertEqual(1, len(rm))
606         finally:
607             l.delete(ldb.Dn(l, "dc=modify2"))
608
609     def test_modify_flags_change(self):
610         l = ldb.Ldb(self.url(), flags=self.flags())
611         m = ldb.Message()
612         m.dn = ldb.Dn(l, "dc=add")
613         m["bla"] = [b"1234"]
614         m["objectUUID"] = b"0123456789abcdef"
615         l.add(m)
616         try:
617             m = ldb.Message()
618             m.dn = ldb.Dn(l, "dc=add")
619             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
620             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
621             l.modify(m)
622             rm = l.search(m.dn)[0]
623             self.assertEqual(3, len(rm))
624             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
625
626             # Now create another modify, but switch the flags before we do it
627             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
628             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
629             l.modify(m)
630             rm = l.search(m.dn, attrs=["bla"])[0]
631             self.assertEqual(1, len(rm))
632             self.assertEqual([b"1234"], list(rm["bla"]))
633         finally:
634             l.delete(ldb.Dn(l, "dc=add"))
635
636     def test_modify_flags_change_text(self):
637         l = ldb.Ldb(self.url(), flags=self.flags())
638         m = ldb.Message()
639         m.dn = ldb.Dn(l, "dc=add")
640         m.text["bla"] = ["1234"]
641         m["objectUUID"] = b"0123456789abcdef"
642         l.add(m)
643         try:
644             m = ldb.Message()
645             m.dn = ldb.Dn(l, "dc=add")
646             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
647             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
648             l.modify(m)
649             rm = l.search(m.dn)[0]
650             self.assertEqual(3, len(rm))
651             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
652
653             # Now create another modify, but switch the flags before we do it
654             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
655             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
656             l.modify(m)
657             rm = l.search(m.dn, attrs=["bla"])[0]
658             self.assertEqual(1, len(rm))
659             self.assertEqual(["1234"], list(rm.text["bla"]))
660         finally:
661             l.delete(ldb.Dn(l, "dc=add"))
662
663     def test_transaction_commit(self):
664         l = ldb.Ldb(self.url(), flags=self.flags())
665         l.transaction_start()
666         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
667         m["foo"] = [b"bar"]
668         m["objectUUID"] = b"0123456789abcdef"
669         l.add(m)
670         l.transaction_commit()
671         l.delete(m.dn)
672
673     def test_transaction_cancel(self):
674         l = ldb.Ldb(self.url(), flags=self.flags())
675         l.transaction_start()
676         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
677         m["foo"] = [b"bar"]
678         m["objectUUID"] = b"0123456789abcdee"
679         l.add(m)
680         l.transaction_cancel()
681         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
682
683     def test_set_debug(self):
684         def my_report_fn(level, text):
685             pass
686         l = ldb.Ldb(self.url(), flags=self.flags())
687         l.set_debug(my_report_fn)
688
689     def test_zero_byte_string(self):
690         """Testing we do not get trapped in the \0 byte in a property string."""
691         l = ldb.Ldb(self.url(), flags=self.flags())
692         l.add({
693             "dn": b"dc=somedn",
694             "objectclass": b"user",
695             "cN": b"LDAPtestUSER",
696             "givenname": b"ldap",
697             "displayname": b"foo\0bar",
698             "objectUUID": b"0123456789abcdef"
699         })
700         res = l.search(expression="(dn=dc=somedn)")
701         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
702
703     def test_no_crash_broken_expr(self):
704         l = ldb.Ldb(self.url(), flags=self.flags())
705         self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
706
707 # Run the SimpleLdb tests against an lmdb backend
708
709
710 class SimpleLdbLmdb(SimpleLdb):
711
712     def setUp(self):
713         self.prefix = MDB_PREFIX
714         self.index = MDB_INDEX_OBJ
715         super(SimpleLdbLmdb, self).setUp()
716
717     def tearDown(self):
718         super(SimpleLdbLmdb, self).tearDown()
719
720
721 class SearchTests(LdbBaseTest):
722     def tearDown(self):
723         shutil.rmtree(self.testdir)
724         super(SearchTests, self).tearDown()
725
726         # Ensure the LDB is closed now, so we close the FD
727         del(self.l)
728
729     def setUp(self):
730         super(SearchTests, self).setUp()
731         self.testdir = tempdir()
732         self.filename = os.path.join(self.testdir, "search_test.ldb")
733         options = ["modules:rdn_name"]
734         if hasattr(self, 'IDXCHECK'):
735             options.append("disable_full_db_scan_for_self_test:1")
736         self.l = ldb.Ldb(self.url(),
737                          flags=self.flags(),
738                          options=options)
739         try:
740             self.l.add(self.index)
741         except AttributeError:
742             pass
743
744         self.l.add({"dn": "@ATTRIBUTES",
745                     "DC": "CASE_INSENSITIVE"})
746
747         # Note that we can't use the name objectGUID here, as we
748         # want to stay clear of the objectGUID handler in LDB and
749         # instead use just the 16 bytes raw, which we just keep
750         # to printable chars here for ease of handling.
751
752         self.l.add({"dn": "DC=SAMBA,DC=ORG",
753                     "name": b"samba.org",
754                     "objectUUID": b"0123456789abcdef"})
755         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
756                     "name": b"Admins",
757                     "x": "z", "y": "a",
758                     "objectUUID": b"0123456789abcde1"})
759         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
760                     "name": b"Users",
761                     "x": "z", "y": "a",
762                     "objectUUID": b"0123456789abcde2"})
763         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
764                     "name": b"OU #1",
765                     "x": "y", "y": "a",
766                     "objectUUID": b"0123456789abcde3"})
767         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
768                     "name": b"OU #2",
769                     "x": "y", "y": "a",
770                     "objectUUID": b"0123456789abcde4"})
771         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
772                     "name": b"OU #3",
773                     "x": "y", "y": "a",
774                     "objectUUID": b"0123456789abcde5"})
775         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
776                     "name": b"OU #4",
777                     "x": "y", "y": "a",
778                     "objectUUID": b"0123456789abcde6"})
779         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
780                     "name": b"OU #5",
781                     "x": "y", "y": "a",
782                     "objectUUID": b"0123456789abcde7"})
783         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
784                     "name": b"OU #6",
785                     "x": "y", "y": "a",
786                     "objectUUID": b"0123456789abcde8"})
787         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
788                     "name": b"OU #7",
789                     "x": "y", "y": "a",
790                     "objectUUID": b"0123456789abcde9"})
791         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
792                     "name": b"OU #8",
793                     "x": "y", "y": "a",
794                     "objectUUID": b"0123456789abcde0"})
795         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
796                     "name": b"OU #9",
797                     "x": "y", "y": "a",
798                     "objectUUID": b"0123456789abcdea"})
799         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
800                     "name": b"OU #10",
801                     "x": "y", "y": "a",
802                     "objectUUID": b"0123456789abcdeb"})
803         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
804                     "name": b"OU #10",
805                     "x": "y", "y": "a",
806                     "objectUUID": b"0123456789abcdec"})
807         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
808                     "name": b"OU #10",
809                     "x": "y", "y": "b",
810                     "objectUUID": b"0123456789abcded"})
811         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
812                     "name": b"OU #10",
813                     "x": "x", "y": "b",
814                     "objectUUID": b"0123456789abcdee"})
815         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
816                     "name": b"OU #10",
817                     "x": "x", "y": "b",
818                     "objectUUID": b"0123456789abcd01"})
819         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
820                     "name": b"OU #10",
821                     "x": "x", "y": "b",
822                     "objectUUID": b"0123456789abcd02"})
823         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
824                     "name": b"OU #10",
825                     "x": "x", "y": "b",
826                     "objectUUID": b"0123456789abcd03"})
827         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
828                     "name": b"OU #10",
829                     "x": "x", "y": "b",
830                     "objectUUID": b"0123456789abcd04"})
831         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
832                     "name": b"OU #10",
833                     "x": "x", "y": "b",
834                     "objectUUID": b"0123456789abcd05"})
835         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
836                     "name": b"OU #10",
837                     "x": "x", "y": "b",
838                     "objectUUID": b"0123456789abcd06"})
839         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
840                     "name": b"OU #10",
841                     "x": "x", "y": "b",
842                     "objectUUID": b"0123456789abcd07"})
843         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
844                     "name": b"OU #10",
845                     "x": "x", "y": "c",
846                     "objectUUID": b"0123456789abcd08"})
847         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
848                     "name": b"OU #10",
849                     "x": "x", "y": "c",
850                     "objectUUID": b"0123456789abcd09"})
851
852     def test_base(self):
853         """Testing a search"""
854
855         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
856                               scope=ldb.SCOPE_BASE)
857         self.assertEqual(len(res11), 1)
858
859     def test_base_lower(self):
860         """Testing a search"""
861
862         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
863                               scope=ldb.SCOPE_BASE)
864         self.assertEqual(len(res11), 1)
865
866     def test_base_or(self):
867         """Testing a search"""
868
869         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
870                               scope=ldb.SCOPE_BASE,
871                               expression="(|(ou=ou11)(ou=ou12))")
872         self.assertEqual(len(res11), 1)
873
874     def test_base_or2(self):
875         """Testing a search"""
876
877         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
878                               scope=ldb.SCOPE_BASE,
879                               expression="(|(x=y)(y=b))")
880         self.assertEqual(len(res11), 1)
881
882     def test_base_and(self):
883         """Testing a search"""
884
885         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
886                               scope=ldb.SCOPE_BASE,
887                               expression="(&(ou=ou11)(ou=ou12))")
888         self.assertEqual(len(res11), 0)
889
890     def test_base_and2(self):
891         """Testing a search"""
892
893         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
894                               scope=ldb.SCOPE_BASE,
895                               expression="(&(x=y)(y=a))")
896         self.assertEqual(len(res11), 1)
897
898     def test_base_false(self):
899         """Testing a search"""
900
901         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
902                               scope=ldb.SCOPE_BASE,
903                               expression="(|(ou=ou13)(ou=ou12))")
904         self.assertEqual(len(res11), 0)
905
906     def test_check_base_false(self):
907         """Testing a search"""
908         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
909                               scope=ldb.SCOPE_BASE,
910                               expression="(|(ou=ou13)(ou=ou12))")
911         self.assertEqual(len(res11), 0)
912
913     def test_check_base_error(self):
914         """Testing a search"""
915         checkbaseonsearch = {"dn": "@OPTIONS",
916                              "checkBaseOnSearch": b"TRUE"}
917         try:
918             self.l.add(checkbaseonsearch)
919         except ldb.LdbError as err:
920             enum = err.args[0]
921             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
922             m = ldb.Message.from_dict(self.l,
923                                       checkbaseonsearch)
924             self.l.modify(m)
925
926         try:
927             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
928                                   scope=ldb.SCOPE_BASE,
929                                   expression="(|(ou=ou13)(ou=ou12))")
930             self.fail("Should have failed on missing base")
931         except ldb.LdbError as err:
932             enum = err.args[0]
933             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
934
935     def test_subtree_and(self):
936         """Testing a search"""
937
938         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
939                               scope=ldb.SCOPE_SUBTREE,
940                               expression="(&(ou=ou11)(ou=ou12))")
941         self.assertEqual(len(res11), 0)
942
943     def test_subtree_and2(self):
944         """Testing a search"""
945
946         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
947                               scope=ldb.SCOPE_SUBTREE,
948                               expression="(&(x=y)(|(y=b)(y=c)))")
949         self.assertEqual(len(res11), 1)
950
951     def test_subtree_and2_lower(self):
952         """Testing a search"""
953
954         res11 = self.l.search(base="DC=samba,DC=org",
955                               scope=ldb.SCOPE_SUBTREE,
956                               expression="(&(x=y)(|(y=b)(y=c)))")
957         self.assertEqual(len(res11), 1)
958
959     def test_subtree_or(self):
960         """Testing a search"""
961
962         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
963                               scope=ldb.SCOPE_SUBTREE,
964                               expression="(|(ou=ou11)(ou=ou12))")
965         self.assertEqual(len(res11), 2)
966
967     def test_subtree_or2(self):
968         """Testing a search"""
969
970         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
971                               scope=ldb.SCOPE_SUBTREE,
972                               expression="(|(x=y)(y=b))")
973         self.assertEqual(len(res11), 20)
974
975     def test_subtree_or3(self):
976         """Testing a search"""
977
978         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
979                               scope=ldb.SCOPE_SUBTREE,
980                               expression="(|(x=y)(y=b)(y=c))")
981         self.assertEqual(len(res11), 22)
982
983     def test_one_and(self):
984         """Testing a search"""
985
986         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
987                               scope=ldb.SCOPE_ONELEVEL,
988                               expression="(&(ou=ou11)(ou=ou12))")
989         self.assertEqual(len(res11), 0)
990
991     def test_one_and2(self):
992         """Testing a search"""
993
994         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
995                               scope=ldb.SCOPE_ONELEVEL,
996                               expression="(&(x=y)(y=b))")
997         self.assertEqual(len(res11), 1)
998
999     def test_one_or(self):
1000         """Testing a search"""
1001
1002         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1003                               scope=ldb.SCOPE_ONELEVEL,
1004                               expression="(|(ou=ou11)(ou=ou12))")
1005         self.assertEqual(len(res11), 2)
1006
1007     def test_one_or2(self):
1008         """Testing a search"""
1009
1010         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1011                               scope=ldb.SCOPE_ONELEVEL,
1012                               expression="(|(x=y)(y=b))")
1013         self.assertEqual(len(res11), 20)
1014
1015     def test_one_or2_lower(self):
1016         """Testing a search"""
1017
1018         res11 = self.l.search(base="DC=samba,DC=org",
1019                               scope=ldb.SCOPE_ONELEVEL,
1020                               expression="(|(x=y)(y=b))")
1021         self.assertEqual(len(res11), 20)
1022
1023     def test_one_unindexable(self):
1024         """Testing a search"""
1025
1026         try:
1027             res11 = self.l.search(base="DC=samba,DC=org",
1028                                   scope=ldb.SCOPE_ONELEVEL,
1029                                   expression="(y=b*)")
1030             if hasattr(self, 'IDX') and \
1031                not hasattr(self, 'IDXONE') and \
1032                hasattr(self, 'IDXCHECK'):
1033                 self.fail("Should have failed as un-indexed search")
1034
1035             self.assertEqual(len(res11), 9)
1036
1037         except ldb.LdbError as err:
1038             enum = err.args[0]
1039             estr = err.args[1]
1040             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1041             self.assertIn(estr, "ldb FULL SEARCH disabled")
1042
1043     def test_one_unindexable_presence(self):
1044         """Testing a search"""
1045
1046         try:
1047             res11 = self.l.search(base="DC=samba,DC=org",
1048                                   scope=ldb.SCOPE_ONELEVEL,
1049                                   expression="(y=*)")
1050             if hasattr(self, 'IDX') and \
1051                not hasattr(self, 'IDXONE') and \
1052                hasattr(self, 'IDXCHECK'):
1053                 self.fail("Should have failed as un-indexed search")
1054
1055             self.assertEqual(len(res11), 24)
1056
1057         except ldb.LdbError as err:
1058             enum = err.args[0]
1059             estr = err.args[1]
1060             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1061             self.assertIn(estr, "ldb FULL SEARCH disabled")
1062
1063     def test_subtree_and_or(self):
1064         """Testing a search"""
1065
1066         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1067                               scope=ldb.SCOPE_SUBTREE,
1068                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1069         self.assertEqual(len(res11), 0)
1070
1071     def test_subtree_and_or2(self):
1072         """Testing a search"""
1073
1074         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1075                               scope=ldb.SCOPE_SUBTREE,
1076                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1077         self.assertEqual(len(res11), 0)
1078
1079     def test_subtree_and_or3(self):
1080         """Testing a search"""
1081
1082         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1083                               scope=ldb.SCOPE_SUBTREE,
1084                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1085         self.assertEqual(len(res11), 2)
1086
1087     def test_subtree_and_or4(self):
1088         """Testing a search"""
1089
1090         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1091                               scope=ldb.SCOPE_SUBTREE,
1092                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1093         self.assertEqual(len(res11), 2)
1094
1095     def test_subtree_and_or5(self):
1096         """Testing a search"""
1097
1098         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1099                               scope=ldb.SCOPE_SUBTREE,
1100                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1101         self.assertEqual(len(res11), 1)
1102
1103     def test_subtree_or_and(self):
1104         """Testing a search"""
1105
1106         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1107                               scope=ldb.SCOPE_SUBTREE,
1108                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1109         self.assertEqual(len(res11), 10)
1110
1111     def test_subtree_large_and_unique(self):
1112         """Testing a search"""
1113
1114         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1115                               scope=ldb.SCOPE_SUBTREE,
1116                               expression="(&(ou=ou10)(y=a))")
1117         self.assertEqual(len(res11), 1)
1118
1119     def test_subtree_and_none(self):
1120         """Testing a search"""
1121
1122         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1123                               scope=ldb.SCOPE_SUBTREE,
1124                               expression="(&(ou=ouX)(y=a))")
1125         self.assertEqual(len(res11), 0)
1126
1127     def test_subtree_and_idx_record(self):
1128         """Testing a search against the index record"""
1129
1130         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1131                               scope=ldb.SCOPE_SUBTREE,
1132                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1133         self.assertEqual(len(res11), 0)
1134
1135     def test_subtree_and_idxone_record(self):
1136         """Testing a search against the index record"""
1137
1138         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1139                               scope=ldb.SCOPE_SUBTREE,
1140                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1141         self.assertEqual(len(res11), 0)
1142
1143     def test_subtree_unindexable(self):
1144         """Testing a search"""
1145
1146         try:
1147             res11 = self.l.search(base="DC=samba,DC=org",
1148                                   scope=ldb.SCOPE_SUBTREE,
1149                                   expression="(y=b*)")
1150             if hasattr(self, 'IDX') and \
1151                hasattr(self, 'IDXCHECK'):
1152                 self.fail("Should have failed as un-indexed search")
1153
1154             self.assertEqual(len(res11), 9)
1155
1156         except ldb.LdbError as err:
1157             enum = err.args[0]
1158             estr = err.args[1]
1159             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1160             self.assertIn(estr, "ldb FULL SEARCH disabled")
1161
1162     def test_subtree_unindexable_presence(self):
1163         """Testing a search"""
1164
1165         try:
1166             res11 = self.l.search(base="DC=samba,DC=org",
1167                                   scope=ldb.SCOPE_SUBTREE,
1168                                   expression="(y=*)")
1169             if hasattr(self, 'IDX') and \
1170                hasattr(self, 'IDXCHECK'):
1171                 self.fail("Should have failed as un-indexed search")
1172
1173             self.assertEqual(len(res11), 24)
1174
1175         except ldb.LdbError as err:
1176             enum = err.args[0]
1177             estr = err.args[1]
1178             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1179             self.assertIn(estr, "ldb FULL SEARCH disabled")
1180
1181     def test_dn_filter_one(self):
1182         """Testing that a dn= filter succeeds
1183         (or fails with disallowDNFilter
1184         set and IDXGUID or (IDX and not IDXONE) mode)
1185         when the scope is SCOPE_ONELEVEL.
1186
1187         This should be made more consistent, but for now lock in
1188         the behaviour
1189
1190         """
1191
1192         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1193                               scope=ldb.SCOPE_ONELEVEL,
1194                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1195         if hasattr(self, 'disallowDNFilter') and \
1196            hasattr(self, 'IDX') and \
1197            (hasattr(self, 'IDXGUID') or
1198             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1199             self.assertEqual(len(res11), 0)
1200         else:
1201             self.assertEqual(len(res11), 1)
1202
1203     def test_dn_filter_subtree(self):
1204         """Testing that a dn= filter succeeds
1205         (or fails with disallowDNFilter set)
1206         when the scope is SCOPE_SUBTREE"""
1207
1208         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1209                               scope=ldb.SCOPE_SUBTREE,
1210                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1211         if hasattr(self, 'disallowDNFilter') \
1212            and hasattr(self, 'IDX'):
1213             self.assertEqual(len(res11), 0)
1214         else:
1215             self.assertEqual(len(res11), 1)
1216
1217     def test_dn_filter_base(self):
1218         """Testing that (incorrectly) a dn= filter works
1219         when the scope is SCOPE_BASE"""
1220
1221         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1222                               scope=ldb.SCOPE_BASE,
1223                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1224
1225         # At some point we should fix this, but it isn't trivial
1226         self.assertEqual(len(res11), 1)
1227
1228     def test_distinguishedName_filter_one(self):
1229         """Testing that a distinguishedName= filter succeeds
1230         when the scope is SCOPE_ONELEVEL.
1231
1232         This should be made more consistent, but for now lock in
1233         the behaviour
1234
1235         """
1236
1237         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1238                               scope=ldb.SCOPE_ONELEVEL,
1239                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1240         self.assertEqual(len(res11), 1)
1241
1242     def test_distinguishedName_filter_subtree(self):
1243         """Testing that a distinguishedName= filter succeeds
1244         when the scope is SCOPE_SUBTREE"""
1245
1246         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1247                               scope=ldb.SCOPE_SUBTREE,
1248                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1249         self.assertEqual(len(res11), 1)
1250
1251     def test_distinguishedName_filter_base(self):
1252         """Testing that (incorrectly) a distinguishedName= filter works
1253         when the scope is SCOPE_BASE"""
1254
1255         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1256                               scope=ldb.SCOPE_BASE,
1257                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1258
1259         # At some point we should fix this, but it isn't trivial
1260         self.assertEqual(len(res11), 1)
1261
1262     def test_bad_dn_filter_base(self):
1263         """Testing that a dn= filter on an invalid DN works
1264         when the scope is SCOPE_BASE but
1265         returns zero results"""
1266
1267         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1268                               scope=ldb.SCOPE_BASE,
1269                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1270
1271         # At some point we should fix this, but it isn't trivial
1272         self.assertEqual(len(res11), 0)
1273
1274
1275     def test_bad_dn_filter_one(self):
1276         """Testing that a dn= filter succeeds but returns zero
1277         results when the DN is not valid on a SCOPE_ONELEVEL search
1278
1279         """
1280
1281         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1282                               scope=ldb.SCOPE_ONELEVEL,
1283                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1284         self.assertEqual(len(res11), 0)
1285
1286     def test_bad_dn_filter_subtree(self):
1287         """Testing that a dn= filter succeeds but returns zero
1288         results when the DN is not valid on a SCOPE_SUBTREE search
1289
1290         """
1291
1292         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1293                               scope=ldb.SCOPE_SUBTREE,
1294                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1295         self.assertEqual(len(res11), 0)
1296
1297     def test_bad_distinguishedName_filter_base(self):
1298         """Testing that a distinguishedName= filter on an invalid DN works
1299         when the scope is SCOPE_BASE but
1300         returns zero results"""
1301
1302         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1303                               scope=ldb.SCOPE_BASE,
1304                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1305
1306         # At some point we should fix this, but it isn't trivial
1307         self.assertEqual(len(res11), 0)
1308
1309
1310     def test_bad_distinguishedName_filter_one(self):
1311         """Testing that a distinguishedName= filter succeeds but returns zero
1312         results when the DN is not valid on a SCOPE_ONELEVEL search
1313
1314         """
1315
1316         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1317                               scope=ldb.SCOPE_ONELEVEL,
1318                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1319         self.assertEqual(len(res11), 0)
1320
1321     def test_bad_distinguishedName_filter_subtree(self):
1322         """Testing that a distinguishedName= filter succeeds but returns zero
1323         results when the DN is not valid on a SCOPE_SUBTREE search
1324
1325         """
1326
1327         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1328                               scope=ldb.SCOPE_SUBTREE,
1329                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1330         self.assertEqual(len(res11), 0)
1331
1332     def test_bad_dn_search_base(self):
1333         """Testing with a bad base DN (SCOPE_BASE)"""
1334
1335         try:
1336             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1337                                   scope=ldb.SCOPE_BASE)
1338             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1339         except ldb.LdbError as err:
1340             enum = err.args[0]
1341             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1342
1343
1344     def test_bad_dn_search_one(self):
1345         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1346
1347         try:
1348             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1349                                   scope=ldb.SCOPE_ONELEVEL)
1350             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1351         except ldb.LdbError as err:
1352             enum = err.args[0]
1353             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1354
1355     def test_bad_dn_search_subtree(self):
1356         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1357
1358         try:
1359             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1360                                   scope=ldb.SCOPE_SUBTREE)
1361             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1362         except ldb.LdbError as err:
1363             enum = err.args[0]
1364             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1365
1366
1367
1368 # Run the search tests against an lmdb backend
1369 class SearchTestsLmdb(SearchTests):
1370
1371     def setUp(self):
1372         self.prefix = MDB_PREFIX
1373         self.index = MDB_INDEX_OBJ
1374         super(SearchTestsLmdb, self).setUp()
1375
1376     def tearDown(self):
1377         super(SearchTestsLmdb, self).tearDown()
1378
1379
1380 class IndexedSearchTests(SearchTests):
1381     """Test searches using the index, to ensure the index doesn't
1382        break things"""
1383
1384     def setUp(self):
1385         super(IndexedSearchTests, self).setUp()
1386         self.l.add({"dn": "@INDEXLIST",
1387                     "@IDXATTR": [b"x", b"y", b"ou"]})
1388         self.IDX = True
1389
1390
1391 class IndexedCheckSearchTests(IndexedSearchTests):
1392     """Test searches using the index, to ensure the index doesn't
1393        break things (full scan disabled)"""
1394
1395     def setUp(self):
1396         self.IDXCHECK = True
1397         super(IndexedCheckSearchTests, self).setUp()
1398
1399
1400 class IndexedSearchDnFilterTests(SearchTests):
1401     """Test searches using the index, to ensure the index doesn't
1402        break things"""
1403
1404     def setUp(self):
1405         super(IndexedSearchDnFilterTests, self).setUp()
1406         self.l.add({"dn": "@OPTIONS",
1407                     "disallowDNFilter": "TRUE"})
1408         self.disallowDNFilter = True
1409
1410         self.l.add({"dn": "@INDEXLIST",
1411                     "@IDXATTR": [b"x", b"y", b"ou"]})
1412         self.IDX = True
1413
1414
1415 class IndexedAndOneLevelSearchTests(SearchTests):
1416     """Test searches using the index including @IDXONE, to ensure
1417        the index doesn't break things"""
1418
1419     def setUp(self):
1420         super(IndexedAndOneLevelSearchTests, self).setUp()
1421         self.l.add({"dn": "@INDEXLIST",
1422                     "@IDXATTR": [b"x", b"y", b"ou"],
1423                     "@IDXONE": [b"1"]})
1424         self.IDX = True
1425         self.IDXONE = True
1426
1427
1428 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1429     """Test searches using the index including @IDXONE, to ensure
1430        the index doesn't break things (full scan disabled)"""
1431
1432     def setUp(self):
1433         self.IDXCHECK = True
1434         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1435
1436
1437 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1438     """Test searches using the index including @IDXONE, to ensure
1439        the index doesn't break things"""
1440
1441     def setUp(self):
1442         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1443         self.l.add({"dn": "@OPTIONS",
1444                     "disallowDNFilter": "TRUE",
1445                     "checkBaseOnSearch": "TRUE"})
1446         self.disallowDNFilter = True
1447         self.checkBaseOnSearch = True
1448
1449         self.l.add({"dn": "@INDEXLIST",
1450                     "@IDXATTR": [b"x", b"y", b"ou"],
1451                     "@IDXONE": [b"1"]})
1452         self.IDX = True
1453         self.IDXONE = True
1454
1455
1456 class GUIDIndexedSearchTests(SearchTests):
1457     """Test searches using the index, to ensure the index doesn't
1458        break things"""
1459
1460     def setUp(self):
1461         self.index = {"dn": "@INDEXLIST",
1462                       "@IDXATTR": [b"x", b"y", b"ou"],
1463                       "@IDXGUID": [b"objectUUID"],
1464                       "@IDX_DN_GUID": [b"GUID"]}
1465         super(GUIDIndexedSearchTests, self).setUp()
1466
1467         self.IDXGUID = True
1468         self.IDXONE = True
1469
1470
1471 class GUIDIndexedDNFilterSearchTests(SearchTests):
1472     """Test searches using the index, to ensure the index doesn't
1473        break things"""
1474
1475     def setUp(self):
1476         self.index = {"dn": "@INDEXLIST",
1477                       "@IDXATTR": [b"x", b"y", b"ou"],
1478                       "@IDXGUID": [b"objectUUID"],
1479                       "@IDX_DN_GUID": [b"GUID"]}
1480         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1481         self.l.add({"dn": "@OPTIONS",
1482                     "disallowDNFilter": "TRUE",
1483                     "checkBaseOnSearch": "TRUE"})
1484         self.disallowDNFilter = True
1485         self.checkBaseOnSearch = True
1486         self.IDX = True
1487         self.IDXGUID = True
1488
1489
1490 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1491     """Test searches using the index including @IDXONE, to ensure
1492        the index doesn't break things"""
1493
1494     def setUp(self):
1495         self.index = {"dn": "@INDEXLIST",
1496                       "@IDXATTR": [b"x", b"y", b"ou"],
1497                       "@IDXGUID": [b"objectUUID"],
1498                       "@IDX_DN_GUID": [b"GUID"]}
1499         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1500         self.l.add({"dn": "@OPTIONS",
1501                     "disallowDNFilter": "TRUE",
1502                     "checkBaseOnSearch": "TRUE"})
1503         self.disallowDNFilter = True
1504         self.checkBaseOnSearch = True
1505         self.IDX = True
1506         self.IDXGUID = True
1507         self.IDXONE = True
1508
1509
1510 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1511
1512     def setUp(self):
1513         self.prefix = MDB_PREFIX
1514         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1515
1516     def tearDown(self):
1517         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1518
1519
1520 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1521
1522     def setUp(self):
1523         self.prefix = MDB_PREFIX
1524         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1525
1526     def tearDown(self):
1527         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1528
1529
1530 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1531
1532     def setUp(self):
1533         self.prefix = MDB_PREFIX
1534         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1535
1536     def tearDown(self):
1537         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1538
1539
1540 class AddModifyTests(LdbBaseTest):
1541     def tearDown(self):
1542         shutil.rmtree(self.testdir)
1543         super(AddModifyTests, self).tearDown()
1544
1545         # Ensure the LDB is closed now, so we close the FD
1546         del(self.l)
1547
1548     def setUp(self):
1549         super(AddModifyTests, self).setUp()
1550         self.testdir = tempdir()
1551         self.filename = os.path.join(self.testdir, "add_test.ldb")
1552         self.l = ldb.Ldb(self.url(),
1553                          flags=self.flags(),
1554                          options=["modules:rdn_name"])
1555         try:
1556             self.l.add(self.index)
1557         except AttributeError:
1558             pass
1559
1560         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1561                     "name": b"samba.org",
1562                     "objectUUID": b"0123456789abcdef"})
1563         self.l.add({"dn": "@ATTRIBUTES",
1564                     "objectUUID": "UNIQUE_INDEX"})
1565
1566     def test_add_dup(self):
1567         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1568                     "name": b"Admins",
1569                     "x": "z", "y": "a",
1570                     "objectUUID": b"0123456789abcde1"})
1571         try:
1572             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1573                         "name": b"Admins",
1574                         "x": "z", "y": "a",
1575                         "objectUUID": b"0123456789abcde2"})
1576             self.fail("Should have failed adding dupliate entry")
1577         except ldb.LdbError as err:
1578             enum = err.args[0]
1579             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1580
1581     def test_add_bad(self):
1582         try:
1583             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1584                         "name": b"Admins",
1585                         "x": "z", "y": "a",
1586                         "objectUUID": b"0123456789abcde1"})
1587             self.fail("Should have failed adding entry with invalid DN")
1588         except ldb.LdbError as err:
1589             enum = err.args[0]
1590             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1591
1592     def test_add_del_add(self):
1593         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1594                     "name": b"Admins",
1595                     "x": "z", "y": "a",
1596                     "objectUUID": b"0123456789abcde1"})
1597         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1598         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1599                     "name": b"Admins",
1600                     "x": "z", "y": "a",
1601                     "objectUUID": b"0123456789abcde2"})
1602
1603     def test_add_move_add(self):
1604         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1605                     "name": b"Admins",
1606                     "x": "z", "y": "a",
1607                     "objectUUID": b"0123456789abcde1"})
1608         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1609                       "OU=DUP2,DC=SAMBA,DC=ORG")
1610         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1611                     "name": b"Admins",
1612                     "x": "z", "y": "a",
1613                     "objectUUID": b"0123456789abcde2"})
1614
1615     def test_add_move_fail_move_move(self):
1616         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1617                     "name": b"Admins",
1618                     "x": "z", "y": "a",
1619                     "objectUUID": b"0123456789abcde1"})
1620         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1621                     "name": b"Admins",
1622                     "x": "z", "y": "a",
1623                     "objectUUID": b"0123456789abcde2"})
1624
1625         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1626                              scope=ldb.SCOPE_SUBTREE,
1627                              expression="(objectUUID=0123456789abcde1)")
1628         self.assertEqual(len(res2), 1)
1629         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1630
1631         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1632                              scope=ldb.SCOPE_SUBTREE,
1633                              expression="(objectUUID=0123456789abcde2)")
1634         self.assertEqual(len(res3), 1)
1635         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1636
1637         try:
1638             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1639                           "OU=DUP2,DC=SAMBA,DC=ORG")
1640             self.fail("Should have failed on duplicate DN")
1641         except ldb.LdbError as err:
1642             enum = err.args[0]
1643             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1644
1645         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1646                       "OU=DUP3,DC=SAMBA,DC=ORG")
1647
1648         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1649                       "OU=DUP2,DC=SAMBA,DC=ORG")
1650
1651         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1652                              scope=ldb.SCOPE_SUBTREE,
1653                              expression="(objectUUID=0123456789abcde1)")
1654         self.assertEqual(len(res2), 1)
1655         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1656
1657         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1658                              scope=ldb.SCOPE_SUBTREE,
1659                              expression="(objectUUID=0123456789abcde2)")
1660         self.assertEqual(len(res3), 1)
1661         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1662
1663     def test_move_missing(self):
1664         try:
1665             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1666                           "OU=DUP2,DC=SAMBA,DC=ORG")
1667             self.fail("Should have failed on missing")
1668         except ldb.LdbError as err:
1669             enum = err.args[0]
1670             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1671
1672     def test_move_missing2(self):
1673         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1674                     "name": b"Admins",
1675                     "x": "z", "y": "a",
1676                     "objectUUID": b"0123456789abcde2"})
1677
1678         try:
1679             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1680                           "OU=DUP2,DC=SAMBA,DC=ORG")
1681             self.fail("Should have failed on missing")
1682         except ldb.LdbError as err:
1683             enum = err.args[0]
1684             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1685
1686     def test_move_bad(self):
1687         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1688                     "name": b"Admins",
1689                     "x": "z", "y": "a",
1690                     "objectUUID": b"0123456789abcde2"})
1691
1692         try:
1693             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1694                           "OU=DUP2,DC=SAMBA,DC=ORG")
1695             self.fail("Should have failed on invalid DN")
1696         except ldb.LdbError as err:
1697             enum = err.args[0]
1698             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1699
1700     def test_move_bad2(self):
1701         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1702                     "name": b"Admins",
1703                     "x": "z", "y": "a",
1704                     "objectUUID": b"0123456789abcde2"})
1705
1706         try:
1707             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1708                           "OUXDUP2,DC=SAMBA,DC=ORG")
1709             self.fail("Should have failed on missing")
1710         except ldb.LdbError as err:
1711             enum = err.args[0]
1712             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1713
1714     def test_move_fail_move_add(self):
1715         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1716                     "name": b"Admins",
1717                     "x": "z", "y": "a",
1718                     "objectUUID": b"0123456789abcde1"})
1719         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1720                     "name": b"Admins",
1721                     "x": "z", "y": "a",
1722                     "objectUUID": b"0123456789abcde2"})
1723         try:
1724             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1725                           "OU=DUP2,DC=SAMBA,DC=ORG")
1726             self.fail("Should have failed on duplicate DN")
1727         except ldb.LdbError as err:
1728             enum = err.args[0]
1729             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1730
1731         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1732                       "OU=DUP3,DC=SAMBA,DC=ORG")
1733
1734         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1735                     "name": b"Admins",
1736                     "x": "z", "y": "a",
1737                     "objectUUID": b"0123456789abcde3"})
1738
1739
1740 class AddModifyTestsLmdb(AddModifyTests):
1741
1742     def setUp(self):
1743         self.prefix = MDB_PREFIX
1744         self.index = MDB_INDEX_OBJ
1745         super(AddModifyTestsLmdb, self).setUp()
1746
1747     def tearDown(self):
1748         super(AddModifyTestsLmdb, self).tearDown()
1749
1750
1751 class IndexedAddModifyTests(AddModifyTests):
1752     """Test searches using the index, to ensure the index doesn't
1753        break things"""
1754
1755     def setUp(self):
1756         if not hasattr(self, 'index'):
1757             self.index = {"dn": "@INDEXLIST",
1758                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1759                           "@IDXONE": [b"1"]}
1760         super(IndexedAddModifyTests, self).setUp()
1761
1762     def test_duplicate_GUID(self):
1763         try:
1764             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1765                         "name": b"Admins",
1766                         "x": "z", "y": "a",
1767                         "objectUUID": b"0123456789abcdef"})
1768             self.fail("Should have failed adding dupliate GUID")
1769         except ldb.LdbError as err:
1770             enum = err.args[0]
1771             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1772
1773     def test_duplicate_name_dup_GUID(self):
1774         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1775                     "name": b"Admins",
1776                     "x": "z", "y": "a",
1777                     "objectUUID": b"a123456789abcdef"})
1778         try:
1779             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1780                         "name": b"Admins",
1781                         "x": "z", "y": "a",
1782                         "objectUUID": b"a123456789abcdef"})
1783             self.fail("Should have failed adding dupliate GUID")
1784         except ldb.LdbError as err:
1785             enum = err.args[0]
1786             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1787
1788     def test_duplicate_name_dup_GUID2(self):
1789         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1790                     "name": b"Admins",
1791                     "x": "z", "y": "a",
1792                     "objectUUID": b"abc3456789abcdef"})
1793         try:
1794             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1795                         "name": b"Admins",
1796                         "x": "z", "y": "a",
1797                         "objectUUID": b"aaa3456789abcdef"})
1798             self.fail("Should have failed adding dupliate DN")
1799         except ldb.LdbError as err:
1800             enum = err.args[0]
1801             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1802
1803         # Checking the GUID didn't stick in the index
1804         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1805                     "name": b"Admins",
1806                     "x": "z", "y": "a",
1807                     "objectUUID": b"aaa3456789abcdef"})
1808
1809     def test_add_dup_guid_add(self):
1810         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1811                     "name": b"Admins",
1812                     "x": "z", "y": "a",
1813                     "objectUUID": b"0123456789abcde1"})
1814         try:
1815             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1816                         "name": b"Admins",
1817                         "x": "z", "y": "a",
1818                         "objectUUID": b"0123456789abcde1"})
1819             self.fail("Should have failed on duplicate GUID")
1820
1821         except ldb.LdbError as err:
1822             enum = err.args[0]
1823             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1824
1825         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1826                     "name": b"Admins",
1827                     "x": "z", "y": "a",
1828                     "objectUUID": b"0123456789abcde2"})
1829
1830
1831 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1832     """Test searches using the index, to ensure the index doesn't
1833        break things"""
1834
1835     def setUp(self):
1836         self.index = {"dn": "@INDEXLIST",
1837                       "@IDXATTR": [b"x", b"y", b"ou"],
1838                       "@IDXONE": [b"1"],
1839                       "@IDXGUID": [b"objectUUID"],
1840                       "@IDX_DN_GUID": [b"GUID"]}
1841         super(GUIDIndexedAddModifyTests, self).setUp()
1842
1843
1844 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1845     """Test GUID index behaviour insdie the transaction"""
1846
1847     def setUp(self):
1848         super(GUIDTransIndexedAddModifyTests, self).setUp()
1849         self.l.transaction_start()
1850
1851     def tearDown(self):
1852         self.l.transaction_commit()
1853         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1854
1855
1856 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1857     """Test index behaviour insdie the transaction"""
1858
1859     def setUp(self):
1860         super(TransIndexedAddModifyTests, self).setUp()
1861         self.l.transaction_start()
1862
1863     def tearDown(self):
1864         self.l.transaction_commit()
1865         super(TransIndexedAddModifyTests, self).tearDown()
1866
1867
1868 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1869
1870     def setUp(self):
1871         self.prefix = MDB_PREFIX
1872         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1873
1874     def tearDown(self):
1875         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1876
1877
1878 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1879
1880     def setUp(self):
1881         self.prefix = MDB_PREFIX
1882         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1883
1884     def tearDown(self):
1885         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1886
1887
1888 class BadIndexTests(LdbBaseTest):
1889     def setUp(self):
1890         super(BadIndexTests, self).setUp()
1891         self.testdir = tempdir()
1892         self.filename = os.path.join(self.testdir, "test.ldb")
1893         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1894         if hasattr(self, 'IDXGUID'):
1895             self.ldb.add({"dn": "@INDEXLIST",
1896                           "@IDXATTR": [b"x", b"y", b"ou"],
1897                           "@IDXGUID": [b"objectUUID"],
1898                           "@IDX_DN_GUID": [b"GUID"]})
1899         else:
1900             self.ldb.add({"dn": "@INDEXLIST",
1901                           "@IDXATTR": [b"x", b"y", b"ou"]})
1902
1903         super(BadIndexTests, self).setUp()
1904
1905     def test_unique(self):
1906         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1907                       "objectUUID": b"0123456789abcde1",
1908                       "y": "1"})
1909         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1910                       "objectUUID": b"0123456789abcde2",
1911                       "y": "1"})
1912         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1913                       "objectUUID": b"0123456789abcde3",
1914                       "y": "1"})
1915
1916         res = self.ldb.search(expression="(y=1)",
1917                               base="dc=samba,dc=org")
1918         self.assertEquals(len(res), 3)
1919
1920         # Now set this to unique index, but forget to check the result
1921         try:
1922             self.ldb.add({"dn": "@ATTRIBUTES",
1923                           "y": "UNIQUE_INDEX"})
1924             self.fail()
1925         except ldb.LdbError:
1926             pass
1927
1928         # We must still have a working index
1929         res = self.ldb.search(expression="(y=1)",
1930                               base="dc=samba,dc=org")
1931         self.assertEquals(len(res), 3)
1932
1933     def test_unique_transaction(self):
1934         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1935                       "objectUUID": b"0123456789abcde1",
1936                       "y": "1"})
1937         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1938                       "objectUUID": b"0123456789abcde2",
1939                       "y": "1"})
1940         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1941                       "objectUUID": b"0123456789abcde3",
1942                       "y": "1"})
1943
1944         res = self.ldb.search(expression="(y=1)",
1945                               base="dc=samba,dc=org")
1946         self.assertEquals(len(res), 3)
1947
1948         self.ldb.transaction_start()
1949
1950         # Now set this to unique index, but forget to check the result
1951         try:
1952             self.ldb.add({"dn": "@ATTRIBUTES",
1953                           "y": "UNIQUE_INDEX"})
1954         except ldb.LdbError:
1955             pass
1956
1957         try:
1958             self.ldb.transaction_commit()
1959             self.fail()
1960
1961         except ldb.LdbError as err:
1962             enum = err.args[0]
1963             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1964
1965         # We must still have a working index
1966         res = self.ldb.search(expression="(y=1)",
1967                               base="dc=samba,dc=org")
1968
1969         self.assertEquals(len(res), 3)
1970
1971     def test_casefold(self):
1972         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1973                       "objectUUID": b"0123456789abcde1",
1974                       "y": "a"})
1975         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1976                       "objectUUID": b"0123456789abcde2",
1977                       "y": "A"})
1978         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1979                       "objectUUID": b"0123456789abcde3",
1980                       "y": ["a", "A"]})
1981
1982         res = self.ldb.search(expression="(y=a)",
1983                               base="dc=samba,dc=org")
1984         self.assertEquals(len(res), 2)
1985
1986         self.ldb.add({"dn": "@ATTRIBUTES",
1987                       "y": "CASE_INSENSITIVE"})
1988
1989         # We must still have a working index
1990         res = self.ldb.search(expression="(y=a)",
1991                               base="dc=samba,dc=org")
1992
1993         if hasattr(self, 'IDXGUID'):
1994             self.assertEquals(len(res), 3)
1995         else:
1996             # We should not return this entry twice, but sadly
1997             # we have not yet fixed
1998             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1999             self.assertEquals(len(res), 4)
2000
2001     def test_casefold_transaction(self):
2002         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2003                       "objectUUID": b"0123456789abcde1",
2004                       "y": "a"})
2005         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2006                       "objectUUID": b"0123456789abcde2",
2007                       "y": "A"})
2008         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2009                       "objectUUID": b"0123456789abcde3",
2010                       "y": ["a", "A"]})
2011
2012         res = self.ldb.search(expression="(y=a)",
2013                               base="dc=samba,dc=org")
2014         self.assertEquals(len(res), 2)
2015
2016         self.ldb.transaction_start()
2017
2018         self.ldb.add({"dn": "@ATTRIBUTES",
2019                       "y": "CASE_INSENSITIVE"})
2020
2021         self.ldb.transaction_commit()
2022
2023         # We must still have a working index
2024         res = self.ldb.search(expression="(y=a)",
2025                               base="dc=samba,dc=org")
2026
2027         if hasattr(self, 'IDXGUID'):
2028             self.assertEquals(len(res), 3)
2029         else:
2030             # We should not return this entry twice, but sadly
2031             # we have not yet fixed
2032             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2033             self.assertEquals(len(res), 4)
2034
2035     def tearDown(self):
2036         super(BadIndexTests, self).tearDown()
2037
2038
2039 class GUIDBadIndexTests(BadIndexTests):
2040     """Test Bad index things with GUID index mode"""
2041
2042     def setUp(self):
2043         self.IDXGUID = True
2044
2045         super(GUIDBadIndexTests, self).setUp()
2046
2047
2048 class DnTests(TestCase):
2049
2050     def setUp(self):
2051         super(DnTests, self).setUp()
2052         self.ldb = ldb.Ldb()
2053
2054     def tearDown(self):
2055         super(DnTests, self).tearDown()
2056         del(self.ldb)
2057
2058     def test_set_dn_invalid(self):
2059         x = ldb.Message()
2060
2061         def assign():
2062             x.dn = "astring"
2063         self.assertRaises(TypeError, assign)
2064
2065     def test_eq(self):
2066         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2067         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2068         self.assertEqual(x, y)
2069         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2070         self.assertNotEqual(x, y)
2071
2072     def test_str(self):
2073         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2074         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2075
2076     def test_repr(self):
2077         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2078         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2079
2080     def test_get_casefold_2(self):
2081         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2082         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2083
2084     def test_validate(self):
2085         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2086         self.assertTrue(x.validate())
2087
2088     def test_parent(self):
2089         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2090         self.assertEqual("bar=bloe", x.parent().__str__())
2091
2092     def test_parent_nonexistent(self):
2093         x = ldb.Dn(self.ldb, "@BLA")
2094         self.assertEqual(None, x.parent())
2095
2096     def test_is_valid(self):
2097         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2098         self.assertTrue(x.is_valid())
2099         x = ldb.Dn(self.ldb, "")
2100         self.assertTrue(x.is_valid())
2101
2102     def test_is_special(self):
2103         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2104         self.assertFalse(x.is_special())
2105         x = ldb.Dn(self.ldb, "@FOOBAR")
2106         self.assertTrue(x.is_special())
2107
2108     def test_check_special(self):
2109         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2110         self.assertFalse(x.check_special("FOOBAR"))
2111         x = ldb.Dn(self.ldb, "@FOOBAR")
2112         self.assertTrue(x.check_special("@FOOBAR"))
2113
2114     def test_len(self):
2115         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2116         self.assertEqual(2, len(x))
2117         x = ldb.Dn(self.ldb, "dc=foo21")
2118         self.assertEqual(1, len(x))
2119
2120     def test_add_child(self):
2121         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2122         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2123         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2124
2125     def test_add_base(self):
2126         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2127         base = ldb.Dn(self.ldb, "bla=bloe")
2128         self.assertTrue(x.add_base(base))
2129         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2130
2131     def test_add_child_str(self):
2132         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2133         self.assertTrue(x.add_child("bla=bloe"))
2134         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2135
2136     def test_add_base_str(self):
2137         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2138         base = "bla=bloe"
2139         self.assertTrue(x.add_base(base))
2140         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2141
2142     def test_add(self):
2143         x = ldb.Dn(self.ldb, "dc=foo24")
2144         y = ldb.Dn(self.ldb, "bar=bla")
2145         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2146
2147     def test_remove_base_components(self):
2148         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2149         x.remove_base_components(len(x) - 1)
2150         self.assertEqual("dc=foo24", str(x))
2151
2152     def test_parse_ldif(self):
2153         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2154         msg = next(msgs)
2155         self.assertEqual("foo=bar", str(msg[1].dn))
2156         self.assertTrue(isinstance(msg[1], ldb.Message))
2157         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2158         self.assertEqual("dn: foo=bar\n\n", ldif)
2159
2160     def test_parse_ldif_more(self):
2161         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2162         msg = next(msgs)
2163         self.assertEqual("foo=bar", str(msg[1].dn))
2164         msg = next(msgs)
2165         self.assertEqual("bar=bar", str(msg[1].dn))
2166
2167     def test_canonical_string(self):
2168         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2169         self.assertEqual("/bloe/foo25", x.canonical_str())
2170
2171     def test_canonical_ex_string(self):
2172         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2173         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2174
2175     def test_ldb_is_child_of(self):
2176         """Testing ldb_dn_compare_dn"""
2177         dn1 = ldb.Dn(self.ldb, "dc=base")
2178         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2179         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2180         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2181
2182         self.assertTrue(dn1.is_child_of(dn1))
2183         self.assertTrue(dn2.is_child_of(dn1))
2184         self.assertTrue(dn4.is_child_of(dn1))
2185         self.assertTrue(dn4.is_child_of(dn3))
2186         self.assertTrue(dn4.is_child_of(dn4))
2187         self.assertFalse(dn3.is_child_of(dn2))
2188         self.assertFalse(dn1.is_child_of(dn4))
2189
2190     def test_ldb_is_child_of_str(self):
2191         """Testing ldb_dn_compare_dn"""
2192         dn1_str = "dc=base"
2193         dn2_str = "cn=foo,dc=base"
2194         dn3_str = "cn=bar,dc=base"
2195         dn4_str = "cn=baz,cn=bar,dc=base"
2196
2197         dn1 = ldb.Dn(self.ldb, dn1_str)
2198         dn2 = ldb.Dn(self.ldb, dn2_str)
2199         dn3 = ldb.Dn(self.ldb, dn3_str)
2200         dn4 = ldb.Dn(self.ldb, dn4_str)
2201
2202         self.assertTrue(dn1.is_child_of(dn1_str))
2203         self.assertTrue(dn2.is_child_of(dn1_str))
2204         self.assertTrue(dn4.is_child_of(dn1_str))
2205         self.assertTrue(dn4.is_child_of(dn3_str))
2206         self.assertTrue(dn4.is_child_of(dn4_str))
2207         self.assertFalse(dn3.is_child_of(dn2_str))
2208         self.assertFalse(dn1.is_child_of(dn4_str))
2209
2210     def test_get_component_name(self):
2211         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2212         self.assertEqual(dn.get_component_name(0), 'cn')
2213         self.assertEqual(dn.get_component_name(1), 'dc')
2214         self.assertEqual(dn.get_component_name(2), None)
2215         self.assertEqual(dn.get_component_name(-1), None)
2216
2217     def test_get_component_value(self):
2218         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2219         self.assertEqual(dn.get_component_value(0), 'foo')
2220         self.assertEqual(dn.get_component_value(1), 'base')
2221         self.assertEqual(dn.get_component_name(2), None)
2222         self.assertEqual(dn.get_component_name(-1), None)
2223
2224     def test_set_component(self):
2225         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2226         dn.set_component(0, 'cn', 'bar')
2227         self.assertEqual(str(dn), "cn=bar,dc=base")
2228         dn.set_component(1, 'o', 'asep')
2229         self.assertEqual(str(dn), "cn=bar,o=asep")
2230         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2231         self.assertEqual(str(dn), "cn=bar,o=asep")
2232         dn.set_component(1, 'o', 'a,b+c')
2233         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2234
2235     def test_set_component_bytes(self):
2236         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2237         dn.set_component(0, 'cn', b'bar')
2238         self.assertEqual(str(dn), "cn=bar,dc=base")
2239         dn.set_component(1, 'o', b'asep')
2240         self.assertEqual(str(dn), "cn=bar,o=asep")
2241
2242     def test_set_component_none(self):
2243         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2244         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2245
2246     def test_get_extended_component_null(self):
2247         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2248         self.assertEqual(dn.get_extended_component("TEST"), None)
2249
2250     def test_get_extended_component(self):
2251         self.ldb._register_test_extensions()
2252         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2253         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2254
2255     def test_set_extended_component(self):
2256         self.ldb._register_test_extensions()
2257         dn = ldb.Dn(self.ldb, "dc=base")
2258         dn.set_extended_component("TEST", "foo")
2259         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2260         dn.set_extended_component("TEST", b"bar")
2261         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2262
2263     def test_extended_str(self):
2264         self.ldb._register_test_extensions()
2265         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2266         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2267
2268     def test_get_rdn_name(self):
2269         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2270         self.assertEqual(dn.get_rdn_name(), 'cn')
2271
2272     def test_get_rdn_value(self):
2273         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2274         self.assertEqual(dn.get_rdn_value(), 'foo')
2275
2276     def test_get_casefold(self):
2277         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2278         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2279
2280     def test_get_linearized(self):
2281         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2282         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2283
2284     def test_is_null(self):
2285         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2286         self.assertFalse(dn.is_null())
2287
2288         dn = ldb.Dn(self.ldb, '')
2289         self.assertTrue(dn.is_null())
2290
2291
2292 class LdbMsgTests(TestCase):
2293
2294     def setUp(self):
2295         super(LdbMsgTests, self).setUp()
2296         self.msg = ldb.Message()
2297
2298     def test_init_dn(self):
2299         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2300         self.assertEqual("dc=foo27", str(self.msg.dn))
2301
2302     def test_iter_items(self):
2303         self.assertEqual(0, len(self.msg.items()))
2304         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2305         self.assertEqual(1, len(self.msg.items()))
2306
2307     def test_repr(self):
2308         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2309         self.msg["dc"] = b"foo"
2310         if PY3:
2311             self.assertIn(repr(self.msg), [
2312                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2313                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2314             ])
2315             self.assertIn(repr(self.msg.text), [
2316                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2317                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2318             ])
2319         else:
2320             self.assertIn(repr(self.msg), [
2321                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})",
2322                 "Message({'dc': MessageElement(['foo']), 'dn': Dn('dc=foo29')})",
2323             ])
2324             self.assertIn(repr(self.msg.text), [
2325                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text",
2326                 "Message({'dc': MessageElement(['foo']), 'dn': Dn('dc=foo29')}).text",
2327             ])
2328
2329     def test_len(self):
2330         self.assertEqual(0, len(self.msg))
2331
2332     def test_notpresent(self):
2333         self.assertRaises(KeyError, lambda: self.msg["foo"])
2334
2335     def test_del(self):
2336         del self.msg["foo"]
2337
2338     def test_add(self):
2339         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2340
2341     def test_add_text(self):
2342         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2343
2344     def test_elements_empty(self):
2345         self.assertEqual([], self.msg.elements())
2346
2347     def test_elements(self):
2348         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2349         self.msg.add(el)
2350         self.assertEqual([el], self.msg.elements())
2351         self.assertEqual([el.text], self.msg.text.elements())
2352
2353     def test_add_value(self):
2354         self.assertEqual(0, len(self.msg))
2355         self.msg["foo"] = [b"foo"]
2356         self.assertEqual(1, len(self.msg))
2357
2358     def test_add_value_text(self):
2359         self.assertEqual(0, len(self.msg))
2360         self.msg["foo"] = ["foo"]
2361         self.assertEqual(1, len(self.msg))
2362
2363     def test_add_value_multiple(self):
2364         self.assertEqual(0, len(self.msg))
2365         self.msg["foo"] = [b"foo", b"bla"]
2366         self.assertEqual(1, len(self.msg))
2367         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2368
2369     def test_add_value_multiple_text(self):
2370         self.assertEqual(0, len(self.msg))
2371         self.msg["foo"] = ["foo", "bla"]
2372         self.assertEqual(1, len(self.msg))
2373         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2374
2375     def test_set_value(self):
2376         self.msg["foo"] = [b"fool"]
2377         self.assertEqual([b"fool"], list(self.msg["foo"]))
2378         self.msg["foo"] = [b"bar"]
2379         self.assertEqual([b"bar"], list(self.msg["foo"]))
2380
2381     def test_set_value_text(self):
2382         self.msg["foo"] = ["fool"]
2383         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2384         self.msg["foo"] = ["bar"]
2385         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2386
2387     def test_keys(self):
2388         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2389         self.msg["foo"] = [b"bla"]
2390         self.msg["bar"] = [b"bla"]
2391         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2392
2393     def test_keys_text(self):
2394         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2395         self.msg["foo"] = ["bla"]
2396         self.msg["bar"] = ["bla"]
2397         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2398
2399     def test_dn(self):
2400         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2401         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2402
2403     def test_get_dn(self):
2404         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2405         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2406
2407     def test_dn_text(self):
2408         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2409         self.assertEqual("@BASEINFO", str(self.msg.dn))
2410         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2411
2412     def test_get_dn_text(self):
2413         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2414         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2415         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2416
2417     def test_get_invalid(self):
2418         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2419         self.assertRaises(TypeError, self.msg.get, 42)
2420
2421     def test_get_other(self):
2422         self.msg["foo"] = [b"bar"]
2423         self.assertEqual(b"bar", self.msg.get("foo")[0])
2424         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2425         self.assertEqual(None, self.msg.get("foo", idx=1))
2426         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2427
2428     def test_get_other_text(self):
2429         self.msg["foo"] = ["bar"]
2430         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2431         self.assertEqual("bar", self.msg.text.get("foo")[0])
2432         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2433         self.assertEqual(None, self.msg.get("foo", idx=1))
2434         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2435
2436     def test_get_default(self):
2437         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2438         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2439
2440     def test_get_default_text(self):
2441         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2442         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2443
2444     def test_get_unknown(self):
2445         self.assertEqual(None, self.msg.get("lalalala"))
2446
2447     def test_get_unknown_text(self):
2448         self.assertEqual(None, self.msg.text.get("lalalala"))
2449
2450     def test_msg_diff(self):
2451         l = ldb.Ldb()
2452         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2453         msg1 = next(msgs)[1]
2454         msg2 = next(msgs)[1]
2455         msgdiff = l.msg_diff(msg1, msg2)
2456         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2457         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2458         self.assertEqual(1, len(msgdiff))
2459
2460     def test_equal_empty(self):
2461         msg1 = ldb.Message()
2462         msg2 = ldb.Message()
2463         self.assertEqual(msg1, msg2)
2464
2465     def test_equal_simplel(self):
2466         db = ldb.Ldb()
2467         msg1 = ldb.Message()
2468         msg1.dn = ldb.Dn(db, "foo=bar")
2469         msg2 = ldb.Message()
2470         msg2.dn = ldb.Dn(db, "foo=bar")
2471         self.assertEqual(msg1, msg2)
2472         msg1['foo'] = b'bar'
2473         msg2['foo'] = b'bar'
2474         self.assertEqual(msg1, msg2)
2475         msg2['foo'] = b'blie'
2476         self.assertNotEqual(msg1, msg2)
2477         msg2['foo'] = b'blie'
2478
2479     def test_from_dict(self):
2480         rec = {"dn": "dc=fromdict",
2481                "a1": [b"a1-val1", b"a1-val1"]}
2482         l = ldb.Ldb()
2483         # check different types of input Flags
2484         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2485             m = ldb.Message.from_dict(l, rec, flags)
2486             self.assertEqual(rec["a1"], list(m["a1"]))
2487             self.assertEqual(flags, m["a1"].flags())
2488         # check input params
2489         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2490         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2491         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2492         # Message.from_dict expects dictionary with 'dn'
2493         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2494         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2495
2496     def test_from_dict_text(self):
2497         rec = {"dn": "dc=fromdict",
2498                "a1": ["a1-val1", "a1-val1"]}
2499         l = ldb.Ldb()
2500         # check different types of input Flags
2501         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2502             m = ldb.Message.from_dict(l, rec, flags)
2503             self.assertEqual(rec["a1"], list(m.text["a1"]))
2504             self.assertEqual(flags, m.text["a1"].flags())
2505         # check input params
2506         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2507         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2508         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2509         # Message.from_dict expects dictionary with 'dn'
2510         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2511         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2512
2513     def test_copy_add_message_element(self):
2514         m = ldb.Message()
2515         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2516         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2517         mto = ldb.Message()
2518         mto["1"] = m["1"]
2519         mto["2"] = m["2"]
2520         self.assertEqual(mto["1"], m["1"])
2521         self.assertEqual(mto["2"], m["2"])
2522         mto = ldb.Message()
2523         mto.add(m["1"])
2524         mto.add(m["2"])
2525         self.assertEqual(mto["1"], m["1"])
2526         self.assertEqual(mto["2"], m["2"])
2527
2528     def test_copy_add_message_element_text(self):
2529         m = ldb.Message()
2530         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2531         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2532         mto = ldb.Message()
2533         mto["1"] = m["1"]
2534         mto["2"] = m["2"]
2535         self.assertEqual(mto["1"], m.text["1"])
2536         self.assertEqual(mto["2"], m.text["2"])
2537         mto = ldb.Message()
2538         mto.add(m["1"])
2539         mto.add(m["2"])
2540         self.assertEqual(mto.text["1"], m.text["1"])
2541         self.assertEqual(mto.text["2"], m.text["2"])
2542         self.assertEqual(mto["1"], m["1"])
2543         self.assertEqual(mto["2"], m["2"])
2544
2545
2546 class MessageElementTests(TestCase):
2547
2548     def test_cmp_element(self):
2549         x = ldb.MessageElement([b"foo"])
2550         y = ldb.MessageElement([b"foo"])
2551         z = ldb.MessageElement([b"bzr"])
2552         self.assertEqual(x, y)
2553         self.assertNotEqual(x, z)
2554
2555     def test_cmp_element_text(self):
2556         x = ldb.MessageElement([b"foo"])
2557         y = ldb.MessageElement(["foo"])
2558         self.assertEqual(x, y)
2559
2560     def test_create_iterable(self):
2561         x = ldb.MessageElement([b"foo"])
2562         self.assertEqual([b"foo"], list(x))
2563         self.assertEqual(["foo"], list(x.text))
2564
2565     def test_repr(self):
2566         x = ldb.MessageElement([b"foo"])
2567         if PY3:
2568             self.assertEqual("MessageElement([b'foo'])", repr(x))
2569             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2570         else:
2571             self.assertEqual("MessageElement(['foo'])", repr(x))
2572             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2573         x = ldb.MessageElement([b"foo", b"bla"])
2574         self.assertEqual(2, len(x))
2575         if PY3:
2576             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2577             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2578         else:
2579             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2580             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2581
2582     def test_get_item(self):
2583         x = ldb.MessageElement([b"foo", b"bar"])
2584         self.assertEqual(b"foo", x[0])
2585         self.assertEqual(b"bar", x[1])
2586         self.assertEqual(b"bar", x[-1])
2587         self.assertRaises(IndexError, lambda: x[45])
2588
2589     def test_get_item_text(self):
2590         x = ldb.MessageElement(["foo", "bar"])
2591         self.assertEqual("foo", x.text[0])
2592         self.assertEqual("bar", x.text[1])
2593         self.assertEqual("bar", x.text[-1])
2594         self.assertRaises(IndexError, lambda: x[45])
2595
2596     def test_len(self):
2597         x = ldb.MessageElement([b"foo", b"bar"])
2598         self.assertEqual(2, len(x))
2599
2600     def test_eq(self):
2601         x = ldb.MessageElement([b"foo", b"bar"])
2602         y = ldb.MessageElement([b"foo", b"bar"])
2603         self.assertEqual(y, x)
2604         x = ldb.MessageElement([b"foo"])
2605         self.assertNotEqual(y, x)
2606         y = ldb.MessageElement([b"foo"])
2607         self.assertEqual(y, x)
2608
2609     def test_extended(self):
2610         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2611         if PY3:
2612             self.assertEqual("MessageElement([b'456'])", repr(el))
2613             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2614         else:
2615             self.assertEqual("MessageElement(['456'])", repr(el))
2616             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2617
2618     def test_bad_text(self):
2619         el = ldb.MessageElement(b'\xba\xdd')
2620         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2621
2622
2623 class ModuleTests(TestCase):
2624
2625     def setUp(self):
2626         super(ModuleTests, self).setUp()
2627         self.testdir = tempdir()
2628         self.filename = os.path.join(self.testdir, "test.ldb")
2629         self.ldb = ldb.Ldb(self.filename)
2630
2631     def tearDown(self):
2632         shutil.rmtree(self.testdir)
2633         super(ModuleTests, self).setUp()
2634
2635     def test_register_module(self):
2636         class ExampleModule:
2637             name = "example"
2638         ldb.register_module(ExampleModule)
2639
2640     def test_use_module(self):
2641         ops = []
2642
2643         class ExampleModule:
2644             name = "bla"
2645
2646             def __init__(self, ldb, next):
2647                 ops.append("init")
2648                 self.next = next
2649
2650             def search(self, *args, **kwargs):
2651                 return self.next.search(*args, **kwargs)
2652
2653             def request(self, *args, **kwargs):
2654                 pass
2655
2656         ldb.register_module(ExampleModule)
2657         l = ldb.Ldb(self.filename)
2658         l.add({"dn": "@MODULES", "@LIST": "bla"})
2659         self.assertEqual([], ops)
2660         l = ldb.Ldb(self.filename)
2661         self.assertEqual(["init"], ops)
2662
2663
2664 class LdbResultTests(LdbBaseTest):
2665
2666     def setUp(self):
2667         super(LdbResultTests, self).setUp()
2668         self.testdir = tempdir()
2669         self.filename = os.path.join(self.testdir, "test.ldb")
2670         self.l = ldb.Ldb(self.url(), flags=self.flags())
2671         try:
2672             self.l.add(self.index)
2673         except AttributeError:
2674             pass
2675         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2676                     "objectUUID": b"0123456789abcde0"})
2677         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2678                     "objectUUID": b"0123456789abcde1"})
2679         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2680                     "objectUUID": b"0123456789abcde2"})
2681         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2682                     "objectUUID": b"0123456789abcde3"})
2683         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2684                     "objectUUID": b"0123456789abcde4"})
2685         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2686                     "objectUUID": b"0123456789abcde5"})
2687         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2688                     "objectUUID": b"0123456789abcde6"})
2689         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2690                     "objectUUID": b"0123456789abcde7"})
2691         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2692                     "objectUUID": b"0123456789abcde8"})
2693         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2694                     "objectUUID": b"0123456789abcde9"})
2695         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2696                     "objectUUID": b"0123456789abcdea"})
2697         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2698                     "objectUUID": b"0123456789abcdeb"})
2699         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2700                     "objectUUID": b"0123456789abcdec"})
2701
2702     def tearDown(self):
2703         shutil.rmtree(self.testdir)
2704         super(LdbResultTests, self).tearDown()
2705         # Ensure the LDB is closed now, so we close the FD
2706         del(self.l)
2707
2708     def test_return_type(self):
2709         res = self.l.search()
2710         self.assertEqual(str(res), "<ldb result>")
2711
2712     def test_get_msgs(self):
2713         res = self.l.search()
2714         list = res.msgs
2715
2716     def test_get_controls(self):
2717         res = self.l.search()
2718         list = res.controls
2719
2720     def test_get_referals(self):
2721         res = self.l.search()
2722         list = res.referals
2723
2724     def test_iter_msgs(self):
2725         found = False
2726         for l in self.l.search().msgs:
2727             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2728                 found = True
2729         self.assertTrue(found)
2730
2731     def test_iter_msgs_count(self):
2732         self.assertTrue(self.l.search().count > 0)
2733         # 13 objects has been added to the DC=SAMBA, DC=ORG
2734         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2735
2736     def test_iter_controls(self):
2737         res = self.l.search().controls
2738         it = iter(res)
2739
2740     def test_create_control(self):
2741         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2742         c = ldb.Control(self.l, "relax:1")
2743         self.assertEqual(c.critical, True)
2744         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2745
2746     def test_iter_refs(self):
2747         res = self.l.search().referals
2748         it = iter(res)
2749
2750     def test_search_sequence_msgs(self):
2751         found = False
2752         res = self.l.search().msgs
2753
2754         for i in range(0, len(res)):
2755             l = res[i]
2756             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2757                 found = True
2758         self.assertTrue(found)
2759
2760     def test_search_as_iter(self):
2761         found = False
2762         res = self.l.search()
2763
2764         for l in res:
2765             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2766                 found = True
2767         self.assertTrue(found)
2768
2769     def test_search_iter(self):
2770         found = False
2771         res = self.l.search_iterator()
2772
2773         for l in res:
2774             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2775                 found = True
2776         self.assertTrue(found)
2777
2778     # Show that search results can't see into a transaction
2779
2780     def test_search_against_trans(self):
2781         found11 = False
2782
2783         (r1, w1) = os.pipe()
2784
2785         (r2, w2) = os.pipe()
2786
2787         # For the first element, fork a child that will
2788         # write to the DB
2789         pid = os.fork()
2790         if pid == 0:
2791             # In the child, re-open
2792             del(self.l)
2793             gc.collect()
2794
2795             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2796             # start a transaction
2797             child_ldb.transaction_start()
2798
2799             # write to it
2800             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2801                            "name": b"samba.org",
2802                            "objectUUID": b"o123456789acbdef"})
2803
2804             os.write(w1, b"added")
2805
2806             # Now wait for the search to be done
2807             os.read(r2, 6)
2808
2809             # and commit
2810             try:
2811                 child_ldb.transaction_commit()
2812             except ldb.LdbError as err:
2813                 # We print this here to see what went wrong in the child
2814                 print(err)
2815                 os._exit(1)
2816
2817             os.write(w1, b"transaction")
2818             os._exit(0)
2819
2820         self.assertEqual(os.read(r1, 5), b"added")
2821
2822         # This should not turn up until the transaction is concluded
2823         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2824                               scope=ldb.SCOPE_BASE)
2825         self.assertEqual(len(res11), 0)
2826
2827         os.write(w2, b"search")
2828
2829         # Now wait for the transaction to be done.  This should
2830         # deadlock, but the search doesn't hold a read lock for the
2831         # iterator lifetime currently.
2832         self.assertEqual(os.read(r1, 11), b"transaction")
2833
2834         # This should now turn up, as the transaction is over
2835         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2836                               scope=ldb.SCOPE_BASE)
2837         self.assertEqual(len(res11), 1)
2838
2839         self.assertFalse(found11)
2840
2841         (got_pid, status) = os.waitpid(pid, 0)
2842         self.assertEqual(got_pid, pid)
2843
2844     def test_search_iter_against_trans(self):
2845         found = False
2846         found11 = False
2847
2848         # We need to hold this iterator open to hold the all-record
2849         # lock
2850         res = self.l.search_iterator()
2851
2852         (r1, w1) = os.pipe()
2853
2854         (r2, w2) = os.pipe()
2855
2856         # For the first element, with the sequence open (which
2857         # means with ldb locks held), fork a child that will
2858         # write to the DB
2859         pid = os.fork()
2860         if pid == 0:
2861             # In the child, re-open
2862             del(res)
2863             del(self.l)
2864             gc.collect()
2865
2866             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2867             # start a transaction
2868             child_ldb.transaction_start()
2869
2870             # write to it
2871             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2872                            "name": b"samba.org",
2873                            "objectUUID": b"o123456789acbdef"})
2874
2875             os.write(w1, b"added")
2876
2877             # Now wait for the search to be done
2878             os.read(r2, 6)
2879
2880             # and commit
2881             try:
2882                 child_ldb.transaction_commit()
2883             except ldb.LdbError as err:
2884                 # We print this here to see what went wrong in the child
2885                 print(err)
2886                 os._exit(1)
2887
2888             os.write(w1, b"transaction")
2889             os._exit(0)
2890
2891         self.assertEqual(os.read(r1, 5), b"added")
2892
2893         # This should not turn up until the transaction is concluded
2894         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2895                               scope=ldb.SCOPE_BASE)
2896         self.assertEqual(len(res11), 0)
2897
2898         os.write(w2, b"search")
2899
2900         # allow the transaction to start
2901         time.sleep(1)
2902
2903         # This should not turn up until the search finishes and
2904         # removed the read lock, but for ldb_tdb that happened as soon
2905         # as we called the first res.next()
2906         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2907                               scope=ldb.SCOPE_BASE)
2908         self.assertEqual(len(res11), 0)
2909
2910         # These results are all collected at the first next(res) call
2911         for l in res:
2912             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2913                 found = True
2914             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2915                 found11 = True
2916
2917         # Now wait for the transaction to be done.
2918         self.assertEqual(os.read(r1, 11), b"transaction")
2919
2920         # This should now turn up, as the transaction is over and all
2921         # read locks are gone
2922         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2923                               scope=ldb.SCOPE_BASE)
2924         self.assertEqual(len(res11), 1)
2925
2926         self.assertTrue(found)
2927         self.assertFalse(found11)
2928
2929         (got_pid, status) = os.waitpid(pid, 0)
2930         self.assertEqual(got_pid, pid)
2931
2932
2933 class LdbResultTestsLmdb(LdbResultTests):
2934
2935     def setUp(self):
2936         self.prefix = MDB_PREFIX
2937         self.index = MDB_INDEX_OBJ
2938         super(LdbResultTestsLmdb, self).setUp()
2939
2940     def tearDown(self):
2941         super(LdbResultTestsLmdb, self).tearDown()
2942
2943
2944 class BadTypeTests(TestCase):
2945     def test_control(self):
2946         l = ldb.Ldb()
2947         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2948         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2949
2950     def test_modify(self):
2951         l = ldb.Ldb()
2952         dn = ldb.Dn(l, 'a=b')
2953         m = ldb.Message(dn)
2954         self.assertRaises(TypeError, l.modify, '<bad type>')
2955         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2956
2957     def test_add(self):
2958         l = ldb.Ldb()
2959         dn = ldb.Dn(l, 'a=b')
2960         m = ldb.Message(dn)
2961         self.assertRaises(TypeError, l.add, '<bad type>')
2962         self.assertRaises(TypeError, l.add, m, '<bad type>')
2963
2964     def test_delete(self):
2965         l = ldb.Ldb()
2966         dn = ldb.Dn(l, 'a=b')
2967         self.assertRaises(TypeError, l.add, '<bad type>')
2968         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2969
2970     def test_rename(self):
2971         l = ldb.Ldb()
2972         dn = ldb.Dn(l, 'a=b')
2973         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2974         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2975         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2976
2977     def test_search(self):
2978         l = ldb.Ldb()
2979         self.assertRaises(TypeError, l.search, base=1234)
2980         self.assertRaises(TypeError, l.search, scope='<bad type>')
2981         self.assertRaises(TypeError, l.search, expression=1234)
2982         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2983         self.assertRaises(TypeError, l.search, controls='<bad type>')
2984
2985
2986 class VersionTests(TestCase):
2987
2988     def test_version(self):
2989         self.assertTrue(isinstance(ldb.__version__, str))
2990
2991
2992 if __name__ == '__main__':
2993     import unittest
2994     unittest.TestProgram()