lib:ldb:tests: Remove explicit comparison with False
[samba.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python3
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 sys.path.insert(0, "bin/python")
9 import gc
10 import time
11 import ldb
12 import shutil
13 import errno
14
15
16 TDB_PREFIX = "tdb://"
17 MDB_PREFIX = "mdb://"
18
19 MDB_INDEX_OBJ = {
20     "dn": "@INDEXLIST",
21     "@IDXONE": [b"1"],
22     "@IDXGUID": [b"objectUUID"],
23     "@IDX_DN_GUID": [b"GUID"]
24 }
25
26
27 def tempdir():
28     import tempfile
29     try:
30         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
31     except KeyError:
32         dir_prefix = None
33     return tempfile.mkdtemp(dir=dir_prefix)
34
35
36 class NoContextTests(TestCase):
37
38     def test_valid_attr_name(self):
39         self.assertTrue(ldb.valid_attr_name("foo"))
40         self.assertFalse(ldb.valid_attr_name("24foo"))
41
42     def test_timestring(self):
43         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
44         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
45
46         self.assertEqual("00000101000000.0Z", ldb.timestring(-62167219200))
47         self.assertEqual("99991231235959.0Z", ldb.timestring(253402300799))
48
49         # should result with OSError EOVERFLOW from gmtime()
50         with self.assertRaises(OSError) as err:
51             ldb.timestring(-62167219201)
52         self.assertEqual(err.exception.errno, errno.EOVERFLOW)
53         with self.assertRaises(OSError) as err:
54             ldb.timestring(253402300800)
55         self.assertEqual(err.exception.errno, errno.EOVERFLOW)
56         with self.assertRaises(OSError) as err:
57             ldb.timestring(0x7fffffffffffffff)
58         self.assertEqual(err.exception.errno, errno.EOVERFLOW)
59
60     def test_string_to_time(self):
61         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
62         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
63
64         self.assertEqual(-62167219200, ldb.string_to_time("00000101000000.0Z"))
65         self.assertEqual(253402300799, ldb.string_to_time("99991231235959.0Z"))
66
67     def test_binary_encode(self):
68         encoded = ldb.binary_encode(b'test\\x')
69         decoded = ldb.binary_decode(encoded)
70         self.assertEqual(decoded, b'test\\x')
71
72         encoded2 = ldb.binary_encode('test\\x')
73         self.assertEqual(encoded2, encoded)
74
75
76 class LdbBaseTest(TestCase):
77     def setUp(self):
78         super(LdbBaseTest, self).setUp()
79         try:
80             if self.prefix is None:
81                 self.prefix = TDB_PREFIX
82         except AttributeError:
83             self.prefix = TDB_PREFIX
84
85     def tearDown(self):
86         super(LdbBaseTest, self).tearDown()
87
88     def url(self):
89         return self.prefix + self.filename
90
91     def flags(self):
92         if self.prefix == MDB_PREFIX:
93             return ldb.FLG_NOSYNC
94         else:
95             return 0
96
97
98 class SimpleLdb(LdbBaseTest):
99
100     def setUp(self):
101         super(SimpleLdb, self).setUp()
102         self.testdir = tempdir()
103         self.filename = os.path.join(self.testdir, "test.ldb")
104         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
105         try:
106             self.ldb.add(self.index)
107         except AttributeError:
108             pass
109
110     def tearDown(self):
111         shutil.rmtree(self.testdir)
112         super(SimpleLdb, self).tearDown()
113         # Ensure the LDB is closed now, so we close the FD
114         del(self.ldb)
115
116     def test_connect(self):
117         ldb.Ldb(self.url(), flags=self.flags())
118
119     def test_connect_none(self):
120         ldb.Ldb()
121
122     def test_connect_later(self):
123         x = ldb.Ldb()
124         x.connect(self.url(), flags=self.flags())
125
126     def test_repr(self):
127         x = ldb.Ldb()
128         self.assertTrue(repr(x).startswith("<ldb connection"))
129
130     def test_set_create_perms(self):
131         x = ldb.Ldb()
132         x.set_create_perms(0o600)
133
134     def test_modules_none(self):
135         x = ldb.Ldb()
136         self.assertEqual([], x.modules())
137
138     def test_modules_tdb(self):
139         x = ldb.Ldb(self.url(), flags=self.flags())
140         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
141
142     def test_firstmodule_none(self):
143         x = ldb.Ldb()
144         self.assertEqual(x.firstmodule, None)
145
146     def test_firstmodule_tdb(self):
147         x = ldb.Ldb(self.url(), flags=self.flags())
148         mod = x.firstmodule
149         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
150
151     def test_search(self):
152         l = ldb.Ldb(self.url(), flags=self.flags())
153         self.assertEqual(len(l.search()), 0)
154
155     def test_search_controls(self):
156         l = ldb.Ldb(self.url(), flags=self.flags())
157         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
158
159     def test_utf8_ldb_Dn(self):
160         l = ldb.Ldb(self.url(), flags=self.flags())
161         dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
162
163     def test_utf8_encoded_ldb_Dn(self):
164         l = ldb.Ldb(self.url(), flags=self.flags())
165         dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
166         try:
167             dn = ldb.Dn(l, dn_encoded_utf8)
168         except UnicodeDecodeError as e:
169                 raise
170         except TypeError as te:
171            p3errors = ["argument 2 must be str, not bytes",
172                        "Can't convert 'bytes' object to str implicitly"]
173            self.assertIn(str(te), p3errors)
174
175     def test_search_attrs(self):
176         l = ldb.Ldb(self.url(), flags=self.flags())
177         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
178
179     def test_search_string_dn(self):
180         l = ldb.Ldb(self.url(), flags=self.flags())
181         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
182
183     def test_search_attr_string(self):
184         l = ldb.Ldb(self.url(), flags=self.flags())
185         self.assertRaises(TypeError, l.search, attrs="dc")
186         self.assertRaises(TypeError, l.search, attrs=b"dc")
187
188     def test_opaque(self):
189         l = ldb.Ldb(self.url(), flags=self.flags())
190         l.set_opaque("my_opaque", l)
191         self.assertTrue(l.get_opaque("my_opaque") is not None)
192         self.assertEqual(None, l.get_opaque("unknown"))
193
194     def test_search_scope_base_empty_db(self):
195         l = ldb.Ldb(self.url(), flags=self.flags())
196         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
197                                       ldb.SCOPE_BASE)), 0)
198
199     def test_search_scope_onelevel_empty_db(self):
200         l = ldb.Ldb(self.url(), flags=self.flags())
201         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
202                                       ldb.SCOPE_ONELEVEL)), 0)
203
204     def test_delete(self):
205         l = ldb.Ldb(self.url(), flags=self.flags())
206         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
207
208     def test_delete_w_unhandled_ctrl(self):
209         l = ldb.Ldb(self.url(), flags=self.flags())
210         m = ldb.Message()
211         m.dn = ldb.Dn(l, "dc=foo1")
212         m["b"] = [b"a"]
213         m["objectUUID"] = b"0123456789abcdef"
214         l.add(m)
215         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
216         l.delete(m.dn)
217
218     def test_contains(self):
219         name = self.url()
220         l = ldb.Ldb(name, flags=self.flags())
221         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
222         l = ldb.Ldb(name, flags=self.flags())
223         m = ldb.Message()
224         m.dn = ldb.Dn(l, "dc=foo3")
225         m["b"] = ["a"]
226         m["objectUUID"] = b"0123456789abcdef"
227         l.add(m)
228         try:
229             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
230             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
231         finally:
232             l.delete(m.dn)
233
234     def test_get_config_basedn(self):
235         l = ldb.Ldb(self.url(), flags=self.flags())
236         self.assertEqual(None, l.get_config_basedn())
237
238     def test_get_root_basedn(self):
239         l = ldb.Ldb(self.url(), flags=self.flags())
240         self.assertEqual(None, l.get_root_basedn())
241
242     def test_get_schema_basedn(self):
243         l = ldb.Ldb(self.url(), flags=self.flags())
244         self.assertEqual(None, l.get_schema_basedn())
245
246     def test_get_default_basedn(self):
247         l = ldb.Ldb(self.url(), flags=self.flags())
248         self.assertEqual(None, l.get_default_basedn())
249
250     def test_add(self):
251         l = ldb.Ldb(self.url(), flags=self.flags())
252         m = ldb.Message()
253         m.dn = ldb.Dn(l, "dc=foo4")
254         m["bla"] = b"bla"
255         m["objectUUID"] = b"0123456789abcdef"
256         self.assertEqual(len(l.search()), 0)
257         l.add(m)
258         try:
259             self.assertEqual(len(l.search()), 1)
260         finally:
261             l.delete(ldb.Dn(l, "dc=foo4"))
262
263     def test_search_iterator(self):
264         l = ldb.Ldb(self.url(), flags=self.flags())
265         s = l.search_iterator()
266         s.abandon()
267         try:
268             for me in s:
269                 self.fail()
270             self.fail()
271         except RuntimeError as re:
272             pass
273         try:
274             s.abandon()
275             self.fail()
276         except RuntimeError as re:
277             pass
278         try:
279             s.result()
280             self.fail()
281         except RuntimeError as re:
282             pass
283
284         s = l.search_iterator()
285         count = 0
286         for me in s:
287             self.assertTrue(isinstance(me, ldb.Message))
288             count += 1
289         r = s.result()
290         self.assertEqual(len(r), 0)
291         self.assertEqual(count, 0)
292
293         m1 = ldb.Message()
294         m1.dn = ldb.Dn(l, "dc=foo4")
295         m1["bla"] = b"bla"
296         m1["objectUUID"] = b"0123456789abcdef"
297         l.add(m1)
298         try:
299             s = l.search_iterator()
300             msgs = []
301             for me in s:
302                 self.assertTrue(isinstance(me, ldb.Message))
303                 count += 1
304                 msgs.append(me)
305             r = s.result()
306             self.assertEqual(len(r), 0)
307             self.assertEqual(len(msgs), 1)
308             self.assertEqual(msgs[0].dn, m1.dn)
309
310             m2 = ldb.Message()
311             m2.dn = ldb.Dn(l, "dc=foo5")
312             m2["bla"] = b"bla"
313             m2["objectUUID"] = b"0123456789abcdee"
314             l.add(m2)
315
316             s = l.search_iterator()
317             msgs = []
318             for me in s:
319                 self.assertTrue(isinstance(me, ldb.Message))
320                 count += 1
321                 msgs.append(me)
322             r = s.result()
323             self.assertEqual(len(r), 0)
324             self.assertEqual(len(msgs), 2)
325             if msgs[0].dn == m1.dn:
326                 self.assertEqual(msgs[0].dn, m1.dn)
327                 self.assertEqual(msgs[1].dn, m2.dn)
328             else:
329                 self.assertEqual(msgs[0].dn, m2.dn)
330                 self.assertEqual(msgs[1].dn, m1.dn)
331
332             s = l.search_iterator()
333             msgs = []
334             for me in s:
335                 self.assertTrue(isinstance(me, ldb.Message))
336                 count += 1
337                 msgs.append(me)
338                 break
339             try:
340                 s.result()
341                 self.fail()
342             except RuntimeError as re:
343                 pass
344             for me in s:
345                 self.assertTrue(isinstance(me, ldb.Message))
346                 count += 1
347                 msgs.append(me)
348                 break
349             for me in s:
350                 self.fail()
351
352             r = s.result()
353             self.assertEqual(len(r), 0)
354             self.assertEqual(len(msgs), 2)
355             if msgs[0].dn == m1.dn:
356                 self.assertEqual(msgs[0].dn, m1.dn)
357                 self.assertEqual(msgs[1].dn, m2.dn)
358             else:
359                 self.assertEqual(msgs[0].dn, m2.dn)
360                 self.assertEqual(msgs[1].dn, m1.dn)
361         finally:
362             l.delete(ldb.Dn(l, "dc=foo4"))
363             l.delete(ldb.Dn(l, "dc=foo5"))
364
365     def test_add_text(self):
366         l = ldb.Ldb(self.url(), flags=self.flags())
367         m = ldb.Message()
368         m.dn = ldb.Dn(l, "dc=foo4")
369         m["bla"] = "bla"
370         m["objectUUID"] = b"0123456789abcdef"
371         self.assertEqual(len(l.search()), 0)
372         l.add(m)
373         try:
374             self.assertEqual(len(l.search()), 1)
375         finally:
376             l.delete(ldb.Dn(l, "dc=foo4"))
377
378     def test_add_w_unhandled_ctrl(self):
379         l = ldb.Ldb(self.url(), flags=self.flags())
380         m = ldb.Message()
381         m.dn = ldb.Dn(l, "dc=foo4")
382         m["bla"] = b"bla"
383         self.assertEqual(len(l.search()), 0)
384         self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
385
386     def test_add_dict(self):
387         l = ldb.Ldb(self.url(), flags=self.flags())
388         m = {"dn": ldb.Dn(l, "dc=foo5"),
389              "bla": b"bla",
390              "objectUUID": b"0123456789abcdef"}
391         self.assertEqual(len(l.search()), 0)
392         l.add(m)
393         try:
394             self.assertEqual(len(l.search()), 1)
395         finally:
396             l.delete(ldb.Dn(l, "dc=foo5"))
397
398     def test_add_dict_text(self):
399         l = ldb.Ldb(self.url(), flags=self.flags())
400         m = {"dn": ldb.Dn(l, "dc=foo5"),
401              "bla": "bla",
402              "objectUUID": b"0123456789abcdef"}
403         self.assertEqual(len(l.search()), 0)
404         l.add(m)
405         try:
406             self.assertEqual(len(l.search()), 1)
407         finally:
408             l.delete(ldb.Dn(l, "dc=foo5"))
409
410     def test_add_dict_string_dn(self):
411         l = ldb.Ldb(self.url(), flags=self.flags())
412         m = {"dn": "dc=foo6", "bla": b"bla",
413              "objectUUID": b"0123456789abcdef"}
414         self.assertEqual(len(l.search()), 0)
415         l.add(m)
416         try:
417             self.assertEqual(len(l.search()), 1)
418         finally:
419             l.delete(ldb.Dn(l, "dc=foo6"))
420
421     def test_add_dict_bytes_dn(self):
422         l = ldb.Ldb(self.url(), flags=self.flags())
423         m = {"dn": b"dc=foo6", "bla": b"bla",
424              "objectUUID": b"0123456789abcdef"}
425         self.assertEqual(len(l.search()), 0)
426         l.add(m)
427         try:
428             self.assertEqual(len(l.search()), 1)
429         finally:
430             l.delete(ldb.Dn(l, "dc=foo6"))
431
432     def test_rename(self):
433         l = ldb.Ldb(self.url(), flags=self.flags())
434         m = ldb.Message()
435         m.dn = ldb.Dn(l, "dc=foo7")
436         m["bla"] = b"bla"
437         m["objectUUID"] = b"0123456789abcdef"
438         self.assertEqual(len(l.search()), 0)
439         l.add(m)
440         try:
441             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
442             self.assertEqual(len(l.search()), 1)
443         finally:
444             l.delete(ldb.Dn(l, "dc=bar"))
445
446     def test_rename_string_dns(self):
447         l = ldb.Ldb(self.url(), flags=self.flags())
448         m = ldb.Message()
449         m.dn = ldb.Dn(l, "dc=foo8")
450         m["bla"] = b"bla"
451         m["objectUUID"] = b"0123456789abcdef"
452         self.assertEqual(len(l.search()), 0)
453         l.add(m)
454         self.assertEqual(len(l.search()), 1)
455         try:
456             l.rename("dc=foo8", "dc=bar")
457             self.assertEqual(len(l.search()), 1)
458         finally:
459             l.delete(ldb.Dn(l, "dc=bar"))
460
461     def test_rename_bad_string_dns(self):
462         l = ldb.Ldb(self.url(), flags=self.flags())
463         m = ldb.Message()
464         m.dn = ldb.Dn(l, "dc=foo8")
465         m["bla"] = b"bla"
466         m["objectUUID"] = b"0123456789abcdef"
467         self.assertEqual(len(l.search()), 0)
468         l.add(m)
469         self.assertEqual(len(l.search()), 1)
470         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
471         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
472         l.delete(ldb.Dn(l, "dc=foo8"))
473
474     def test_empty_dn(self):
475         l = ldb.Ldb(self.url(), flags=self.flags())
476         self.assertEqual(0, len(l.search()))
477         m = ldb.Message()
478         m.dn = ldb.Dn(l, "dc=empty")
479         m["objectUUID"] = b"0123456789abcdef"
480         l.add(m)
481         rm = l.search()
482         self.assertEqual(1, len(rm))
483         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
484                          set(rm[0].keys()))
485
486         rm = l.search(m.dn)
487         self.assertEqual(1, len(rm))
488         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
489                          set(rm[0].keys()))
490         rm = l.search(m.dn, attrs=["blah"])
491         self.assertEqual(1, len(rm))
492         self.assertEqual(0, len(rm[0]))
493
494     def test_modify_delete(self):
495         l = ldb.Ldb(self.url(), flags=self.flags())
496         m = ldb.Message()
497         m.dn = ldb.Dn(l, "dc=modifydelete")
498         m["bla"] = [b"1234"]
499         m["objectUUID"] = b"0123456789abcdef"
500         l.add(m)
501         rm = l.search(m.dn)[0]
502         self.assertEqual([b"1234"], list(rm["bla"]))
503         try:
504             m = ldb.Message()
505             m.dn = ldb.Dn(l, "dc=modifydelete")
506             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
507             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
508             l.modify(m)
509             rm = l.search(m.dn)
510             self.assertEqual(1, len(rm))
511             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
512                              set(rm[0].keys()))
513             rm = l.search(m.dn, attrs=["bla"])
514             self.assertEqual(1, len(rm))
515             self.assertEqual(0, len(rm[0]))
516         finally:
517             l.delete(ldb.Dn(l, "dc=modifydelete"))
518
519     def test_modify_delete_text(self):
520         l = ldb.Ldb(self.url(), flags=self.flags())
521         m = ldb.Message()
522         m.dn = ldb.Dn(l, "dc=modifydelete")
523         m.text["bla"] = ["1234"]
524         m["objectUUID"] = b"0123456789abcdef"
525         l.add(m)
526         rm = l.search(m.dn)[0]
527         self.assertEqual(["1234"], list(rm.text["bla"]))
528         try:
529             m = ldb.Message()
530             m.dn = ldb.Dn(l, "dc=modifydelete")
531             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
532             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
533             l.modify(m)
534             rm = l.search(m.dn)
535             self.assertEqual(1, len(rm))
536             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
537                              set(rm[0].keys()))
538             rm = l.search(m.dn, attrs=["bla"])
539             self.assertEqual(1, len(rm))
540             self.assertEqual(0, len(rm[0]))
541         finally:
542             l.delete(ldb.Dn(l, "dc=modifydelete"))
543
544     def test_modify_add(self):
545         l = ldb.Ldb(self.url(), flags=self.flags())
546         m = ldb.Message()
547         m.dn = ldb.Dn(l, "dc=add")
548         m["bla"] = [b"1234"]
549         m["objectUUID"] = b"0123456789abcdef"
550         l.add(m)
551         try:
552             m = ldb.Message()
553             m.dn = ldb.Dn(l, "dc=add")
554             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
555             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
556             l.modify(m)
557             rm = l.search(m.dn)[0]
558             self.assertEqual(3, len(rm))
559             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
560         finally:
561             l.delete(ldb.Dn(l, "dc=add"))
562
563     def test_modify_add_text(self):
564         l = ldb.Ldb(self.url(), flags=self.flags())
565         m = ldb.Message()
566         m.dn = ldb.Dn(l, "dc=add")
567         m.text["bla"] = ["1234"]
568         m["objectUUID"] = b"0123456789abcdef"
569         l.add(m)
570         try:
571             m = ldb.Message()
572             m.dn = ldb.Dn(l, "dc=add")
573             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
574             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
575             l.modify(m)
576             rm = l.search(m.dn)[0]
577             self.assertEqual(3, len(rm))
578             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
579         finally:
580             l.delete(ldb.Dn(l, "dc=add"))
581
582     def test_modify_replace(self):
583         l = ldb.Ldb(self.url(), flags=self.flags())
584         m = ldb.Message()
585         m.dn = ldb.Dn(l, "dc=modify2")
586         m["bla"] = [b"1234", b"456"]
587         m["objectUUID"] = b"0123456789abcdef"
588         l.add(m)
589         try:
590             m = ldb.Message()
591             m.dn = ldb.Dn(l, "dc=modify2")
592             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
593             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
594             l.modify(m)
595             rm = l.search(m.dn)[0]
596             self.assertEqual(3, len(rm))
597             self.assertEqual([b"789"], list(rm["bla"]))
598             rm = l.search(m.dn, attrs=["bla"])[0]
599             self.assertEqual(1, len(rm))
600         finally:
601             l.delete(ldb.Dn(l, "dc=modify2"))
602
603     def test_modify_replace_text(self):
604         l = ldb.Ldb(self.url(), flags=self.flags())
605         m = ldb.Message()
606         m.dn = ldb.Dn(l, "dc=modify2")
607         m.text["bla"] = ["1234", "456"]
608         m["objectUUID"] = b"0123456789abcdef"
609         l.add(m)
610         try:
611             m = ldb.Message()
612             m.dn = ldb.Dn(l, "dc=modify2")
613             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
614             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
615             l.modify(m)
616             rm = l.search(m.dn)[0]
617             self.assertEqual(3, len(rm))
618             self.assertEqual(["789"], list(rm.text["bla"]))
619             rm = l.search(m.dn, attrs=["bla"])[0]
620             self.assertEqual(1, len(rm))
621         finally:
622             l.delete(ldb.Dn(l, "dc=modify2"))
623
624     def test_modify_flags_change(self):
625         l = ldb.Ldb(self.url(), flags=self.flags())
626         m = ldb.Message()
627         m.dn = ldb.Dn(l, "dc=add")
628         m["bla"] = [b"1234"]
629         m["objectUUID"] = b"0123456789abcdef"
630         l.add(m)
631         try:
632             m = ldb.Message()
633             m.dn = ldb.Dn(l, "dc=add")
634             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
635             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
636             l.modify(m)
637             rm = l.search(m.dn)[0]
638             self.assertEqual(3, len(rm))
639             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
640
641             # Now create another modify, but switch the flags before we do it
642             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
643             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
644             l.modify(m)
645             rm = l.search(m.dn, attrs=["bla"])[0]
646             self.assertEqual(1, len(rm))
647             self.assertEqual([b"1234"], list(rm["bla"]))
648         finally:
649             l.delete(ldb.Dn(l, "dc=add"))
650
651     def test_modify_flags_change_text(self):
652         l = ldb.Ldb(self.url(), flags=self.flags())
653         m = ldb.Message()
654         m.dn = ldb.Dn(l, "dc=add")
655         m.text["bla"] = ["1234"]
656         m["objectUUID"] = b"0123456789abcdef"
657         l.add(m)
658         try:
659             m = ldb.Message()
660             m.dn = ldb.Dn(l, "dc=add")
661             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
662             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
663             l.modify(m)
664             rm = l.search(m.dn)[0]
665             self.assertEqual(3, len(rm))
666             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
667
668             # Now create another modify, but switch the flags before we do it
669             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
670             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
671             l.modify(m)
672             rm = l.search(m.dn, attrs=["bla"])[0]
673             self.assertEqual(1, len(rm))
674             self.assertEqual(["1234"], list(rm.text["bla"]))
675         finally:
676             l.delete(ldb.Dn(l, "dc=add"))
677
678     def test_transaction_commit(self):
679         l = ldb.Ldb(self.url(), flags=self.flags())
680         l.transaction_start()
681         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
682         m["foo"] = [b"bar"]
683         m["objectUUID"] = b"0123456789abcdef"
684         l.add(m)
685         l.transaction_commit()
686         l.delete(m.dn)
687
688     def test_transaction_cancel(self):
689         l = ldb.Ldb(self.url(), flags=self.flags())
690         l.transaction_start()
691         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
692         m["foo"] = [b"bar"]
693         m["objectUUID"] = b"0123456789abcdee"
694         l.add(m)
695         l.transaction_cancel()
696         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
697
698     def test_set_debug(self):
699         def my_report_fn(level, text):
700             pass
701         l = ldb.Ldb(self.url(), flags=self.flags())
702         l.set_debug(my_report_fn)
703
704     def test_zero_byte_string(self):
705         """Testing we do not get trapped in the \0 byte in a property string."""
706         l = ldb.Ldb(self.url(), flags=self.flags())
707         l.add({
708             "dn": b"dc=somedn",
709             "objectclass": b"user",
710             "cN": b"LDAPtestUSER",
711             "givenname": b"ldap",
712             "displayname": b"foo\0bar",
713             "objectUUID": b"0123456789abcdef"
714         })
715         res = l.search(expression="(dn=dc=somedn)")
716         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
717
718     def test_no_crash_broken_expr(self):
719         l = ldb.Ldb(self.url(), flags=self.flags())
720         self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
721
722 # Run the SimpleLdb tests against an lmdb backend
723
724
725 class SimpleLdbLmdb(SimpleLdb):
726
727     def setUp(self):
728         if os.environ.get('HAVE_LMDB', '1') == '0':
729             self.skipTest("No lmdb backend")
730         self.prefix = MDB_PREFIX
731         self.index = MDB_INDEX_OBJ
732         super(SimpleLdbLmdb, self).setUp()
733
734     def tearDown(self):
735         super(SimpleLdbLmdb, self).tearDown()
736
737
738 class SimpleLdbNoLmdb(LdbBaseTest):
739
740     def setUp(self):
741         if os.environ.get('HAVE_LMDB', '1') != '0':
742             self.skipTest("lmdb backend enabled")
743         self.prefix = MDB_PREFIX
744         self.index = MDB_INDEX_OBJ
745         super(SimpleLdbNoLmdb, self).setUp()
746
747     def tearDown(self):
748         super(SimpleLdbNoLmdb, self).tearDown()
749
750     def test_lmdb_disabled(self):
751         self.testdir = tempdir()
752         self.filename = os.path.join(self.testdir, "test.ldb")
753         try:
754             self.ldb = ldb.Ldb(self.url(), flags=self.flags())
755             self.fail("Should have failed on missing LMDB")
756         except ldb.LdbError as err:
757             enum = err.args[0]
758             self.assertEqual(enum, ldb.ERR_OTHER)
759
760
761 class SearchTests(LdbBaseTest):
762     def tearDown(self):
763         shutil.rmtree(self.testdir)
764         super(SearchTests, self).tearDown()
765
766         # Ensure the LDB is closed now, so we close the FD
767         del(self.l)
768
769     def setUp(self):
770         super(SearchTests, self).setUp()
771         self.testdir = tempdir()
772         self.filename = os.path.join(self.testdir, "search_test.ldb")
773         options = ["modules:rdn_name"]
774         if hasattr(self, 'IDXCHECK'):
775             options.append("disable_full_db_scan_for_self_test:1")
776         self.l = ldb.Ldb(self.url(),
777                          flags=self.flags(),
778                          options=options)
779         try:
780             self.l.add(self.index)
781         except AttributeError:
782             pass
783
784         self.l.add({"dn": "@ATTRIBUTES",
785                     "DC": "CASE_INSENSITIVE"})
786
787         # Note that we can't use the name objectGUID here, as we
788         # want to stay clear of the objectGUID handler in LDB and
789         # instead use just the 16 bytes raw, which we just keep
790         # to printable chars here for ease of handling.
791
792         self.l.add({"dn": "DC=ORG",
793                     "name": b"org",
794                     "objectUUID": b"0000000000abcdef"})
795         self.l.add({"dn": "DC=EXAMPLE,DC=ORG",
796                     "name": b"org",
797                     "objectUUID": b"0000000001abcdef"})
798         self.l.add({"dn": "OU=OU1,DC=EXAMPLE,DC=ORG",
799                     "name": b"OU #1",
800                     "x": "y", "y": "a",
801                     "objectUUID": b"0023456789abcde3"})
802         self.l.add({"dn": "OU=OU2,DC=EXAMPLE,DC=ORG",
803                     "name": b"OU #2",
804                     "x": "y", "y": "a",
805                     "objectUUID": b"0023456789abcde4"})
806         self.l.add({"dn": "OU=OU3,DC=EXAMPLE,DC=ORG",
807                     "name": b"OU #3",
808                     "x": "y", "y": "a",
809                     "objectUUID": b"0023456789abcde5"})
810         self.l.add({"dn": "OU=OU4,DC=EXAMPLE,DC=ORG",
811                     "name": b"OU #4",
812                     "x": "z", "y": "b",
813                     "objectUUID": b"0023456789abcde6"})
814         self.l.add({"dn": "OU=OU5,DC=EXAMPLE,DC=ORG",
815                     "name": b"OU #5",
816                     "x": "y", "y": "a",
817                     "objectUUID": b"0023456789abcde7"})
818         self.l.add({"dn": "OU=OU6,DC=EXAMPLE,DC=ORG",
819                     "name": b"OU #6",
820                     "x": "y", "y": "a",
821                     "objectUUID": b"0023456789abcde8"})
822         self.l.add({"dn": "OU=OU7,DC=EXAMPLE,DC=ORG",
823                     "name": b"OU #7",
824                     "x": "y", "y": "c",
825                     "objectUUID": b"0023456789abcde9"})
826         self.l.add({"dn": "OU=OU8,DC=EXAMPLE,DC=ORG",
827                     "name": b"OU #8",
828                     "x": "y", "y": "b",
829                     "objectUUID": b"0023456789abcde0"})
830         self.l.add({"dn": "OU=OU9,DC=EXAMPLE,DC=ORG",
831                     "name": b"OU #9",
832                     "x": "y", "y": "a",
833                     "objectUUID": b"0023456789abcdea"})
834
835         self.l.add({"dn": "DC=EXAMPLE,DC=COM",
836                     "name": b"org",
837                     "objectUUID": b"0000000011abcdef"})
838
839         self.l.add({"dn": "DC=EXAMPLE,DC=NET",
840                     "name": b"org",
841                     "objectUUID": b"0000000021abcdef"})
842
843         self.l.add({"dn": "OU=UNIQUE,DC=EXAMPLE,DC=NET",
844                     "objectUUID": b"0000000022abcdef"})
845
846         self.l.add({"dn": "DC=SAMBA,DC=ORG",
847                     "name": b"samba.org",
848                     "objectUUID": b"0123456789abcdef"})
849         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
850                     "name": b"Admins",
851                     "x": "z", "y": "a",
852                     "objectUUID": b"0123456789abcde1"})
853         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
854                     "name": b"Users",
855                     "x": "z", "y": "a",
856                     "objectUUID": b"0123456789abcde2"})
857         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
858                     "name": b"OU #1",
859                     "x": "y", "y": "a",
860                     "objectUUID": b"0123456789abcde3"})
861         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
862                     "name": b"OU #2",
863                     "x": "y", "y": "a",
864                     "objectUUID": b"0123456789abcde4"})
865         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
866                     "name": b"OU #3",
867                     "x": "y", "y": "a",
868                     "objectUUID": b"0123456789abcde5"})
869         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
870                     "name": b"OU #4",
871                     "x": "y", "y": "a",
872                     "objectUUID": b"0123456789abcde6"})
873         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
874                     "name": b"OU #5",
875                     "x": "y", "y": "a",
876                     "objectUUID": b"0123456789abcde7"})
877         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
878                     "name": b"OU #6",
879                     "x": "y", "y": "a",
880                     "objectUUID": b"0123456789abcde8"})
881         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
882                     "name": b"OU #7",
883                     "x": "y", "y": "a",
884                     "objectUUID": b"0123456789abcde9"})
885         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
886                     "name": b"OU #8",
887                     "x": "y", "y": "a",
888                     "objectUUID": b"0123456789abcde0"})
889         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
890                     "name": b"OU #9",
891                     "x": "y", "y": "a",
892                     "objectUUID": b"0123456789abcdea"})
893         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
894                     "name": b"OU #10",
895                     "x": "y", "y": "a",
896                     "objectUUID": b"0123456789abcdeb"})
897         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
898                     "name": b"OU #10",
899                     "x": "y", "y": "a",
900                     "objectUUID": b"0123456789abcdec"})
901         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
902                     "name": b"OU #10",
903                     "x": "y", "y": "b",
904                     "objectUUID": b"0123456789abcded"})
905         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
906                     "name": b"OU #10",
907                     "x": "x", "y": "b",
908                     "objectUUID": b"0123456789abcdee"})
909         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
910                     "name": b"OU #10",
911                     "x": "x", "y": "b",
912                     "objectUUID": b"0123456789abcd01"})
913         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
914                     "name": b"OU #10",
915                     "x": "x", "y": "b",
916                     "objectUUID": b"0123456789abcd02"})
917         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
918                     "name": b"OU #10",
919                     "x": "x", "y": "b",
920                     "objectUUID": b"0123456789abcd03"})
921         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
922                     "name": b"OU #10",
923                     "x": "x", "y": "b",
924                     "objectUUID": b"0123456789abcd04"})
925         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
926                     "name": b"OU #10",
927                     "x": "x", "y": "b",
928                     "objectUUID": b"0123456789abcd05"})
929         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
930                     "name": b"OU #10",
931                     "x": "x", "y": "b",
932                     "objectUUID": b"0123456789abcd06"})
933         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
934                     "name": b"OU #10",
935                     "x": "x", "y": "b",
936                     "objectUUID": b"0123456789abcd07"})
937         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
938                     "name": b"OU #10",
939                     "x": "x", "y": "c",
940                     "objectUUID": b"0123456789abcd08"})
941         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
942                     "name": b"OU #10",
943                     "x": "x", "y": "c",
944                     "objectUUID": b"0123456789abcd09"})
945
946     def test_base(self):
947         """Testing a search"""
948
949         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
950                               scope=ldb.SCOPE_BASE)
951         self.assertEqual(len(res11), 1)
952
953     def test_base_lower(self):
954         """Testing a search"""
955
956         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
957                               scope=ldb.SCOPE_BASE)
958         self.assertEqual(len(res11), 1)
959
960     def test_base_or(self):
961         """Testing a search"""
962
963         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
964                               scope=ldb.SCOPE_BASE,
965                               expression="(|(ou=ou11)(ou=ou12))")
966         self.assertEqual(len(res11), 1)
967
968     def test_base_or2(self):
969         """Testing a search"""
970
971         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
972                               scope=ldb.SCOPE_BASE,
973                               expression="(|(x=y)(y=b))")
974         self.assertEqual(len(res11), 1)
975
976     def test_base_and(self):
977         """Testing a search"""
978
979         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
980                               scope=ldb.SCOPE_BASE,
981                               expression="(&(ou=ou11)(ou=ou12))")
982         self.assertEqual(len(res11), 0)
983
984     def test_base_and2(self):
985         """Testing a search"""
986
987         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
988                               scope=ldb.SCOPE_BASE,
989                               expression="(&(x=y)(y=a))")
990         self.assertEqual(len(res11), 1)
991
992     def test_base_false(self):
993         """Testing a search"""
994
995         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
996                               scope=ldb.SCOPE_BASE,
997                               expression="(|(ou=ou13)(ou=ou12))")
998         self.assertEqual(len(res11), 0)
999
1000     def test_check_base_false(self):
1001         """Testing a search"""
1002         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1003                               scope=ldb.SCOPE_BASE,
1004                               expression="(|(ou=ou13)(ou=ou12))")
1005         self.assertEqual(len(res11), 0)
1006
1007     def test_check_base_error(self):
1008         """Testing a search"""
1009         checkbaseonsearch = {"dn": "@OPTIONS",
1010                              "checkBaseOnSearch": b"TRUE"}
1011         try:
1012             self.l.add(checkbaseonsearch)
1013         except ldb.LdbError as err:
1014             enum = err.args[0]
1015             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1016             m = ldb.Message.from_dict(self.l,
1017                                       checkbaseonsearch)
1018             self.l.modify(m)
1019
1020         try:
1021             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
1022                                   scope=ldb.SCOPE_BASE,
1023                                   expression="(|(ou=ou13)(ou=ou12))")
1024             self.fail("Should have failed on missing base")
1025         except ldb.LdbError as err:
1026             enum = err.args[0]
1027             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1028
1029     def test_subtree(self):
1030         """Testing a search"""
1031
1032         try:
1033             res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1034                                   scope=ldb.SCOPE_SUBTREE)
1035             if hasattr(self, 'IDXCHECK'):
1036                 self.fail()
1037         except ldb.LdbError as err:
1038             enum = err.args[0]
1039             estr = err.args[1]
1040             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1041             self.assertIn(estr, "ldb FULL SEARCH disabled")
1042         else:
1043             self.assertEqual(len(res11), 25)
1044
1045     def test_subtree2(self):
1046         """Testing a search"""
1047
1048         try:
1049             res11 = self.l.search(base="DC=ORG",
1050                                   scope=ldb.SCOPE_SUBTREE)
1051             if hasattr(self, 'IDXCHECK'):
1052                 self.fail()
1053         except ldb.LdbError as err:
1054             enum = err.args[0]
1055             estr = err.args[1]
1056             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1057             self.assertIn(estr, "ldb FULL SEARCH disabled")
1058         else:
1059             self.assertEqual(len(res11), 36)
1060
1061     def test_subtree_and(self):
1062         """Testing a search"""
1063
1064         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1065                               scope=ldb.SCOPE_SUBTREE,
1066                               expression="(&(ou=ou11)(ou=ou12))")
1067         self.assertEqual(len(res11), 0)
1068
1069     def test_subtree_and2(self):
1070         """Testing a search"""
1071
1072         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1073                               scope=ldb.SCOPE_SUBTREE,
1074                               expression="(&(x=y)(|(y=b)(y=c)))")
1075         self.assertEqual(len(res11), 1)
1076
1077     def test_subtree_and2_lower(self):
1078         """Testing a search"""
1079
1080         res11 = self.l.search(base="DC=samba,DC=org",
1081                               scope=ldb.SCOPE_SUBTREE,
1082                               expression="(&(x=y)(|(y=b)(y=c)))")
1083         self.assertEqual(len(res11), 1)
1084
1085     def test_subtree_or(self):
1086         """Testing a search"""
1087
1088         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1089                               scope=ldb.SCOPE_SUBTREE,
1090                               expression="(|(ou=ou11)(ou=ou12))")
1091         self.assertEqual(len(res11), 2)
1092
1093     def test_subtree_or2(self):
1094         """Testing a search"""
1095
1096         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1097                               scope=ldb.SCOPE_SUBTREE,
1098                               expression="(|(x=y)(y=b))")
1099         self.assertEqual(len(res11), 20)
1100
1101     def test_subtree_or3(self):
1102         """Testing a search"""
1103
1104         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1105                               scope=ldb.SCOPE_SUBTREE,
1106                               expression="(|(x=y)(y=b)(y=c))")
1107         self.assertEqual(len(res11), 22)
1108
1109     def test_one_and(self):
1110         """Testing a search"""
1111
1112         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1113                               scope=ldb.SCOPE_ONELEVEL,
1114                               expression="(&(ou=ou11)(ou=ou12))")
1115         self.assertEqual(len(res11), 0)
1116
1117     def test_one_and2(self):
1118         """Testing a search"""
1119
1120         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1121                               scope=ldb.SCOPE_ONELEVEL,
1122                               expression="(&(x=y)(y=b))")
1123         self.assertEqual(len(res11), 1)
1124
1125     def test_one_or(self):
1126         """Testing a search"""
1127
1128         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1129                               scope=ldb.SCOPE_ONELEVEL,
1130                               expression="(|(ou=ou11)(ou=ou12))")
1131         self.assertEqual(len(res11), 2)
1132
1133     def test_one_or2(self):
1134         """Testing a search"""
1135
1136         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1137                               scope=ldb.SCOPE_ONELEVEL,
1138                               expression="(|(x=y)(y=b))")
1139         self.assertEqual(len(res11), 20)
1140
1141     def test_one_or2_lower(self):
1142         """Testing a search"""
1143
1144         res11 = self.l.search(base="DC=samba,DC=org",
1145                               scope=ldb.SCOPE_ONELEVEL,
1146                               expression="(|(x=y)(y=b))")
1147         self.assertEqual(len(res11), 20)
1148
1149     def test_one_unindexable(self):
1150         """Testing a search"""
1151
1152         try:
1153             res11 = self.l.search(base="DC=samba,DC=org",
1154                                   scope=ldb.SCOPE_ONELEVEL,
1155                                   expression="(y=b*)")
1156             if hasattr(self, 'IDX') and \
1157                not hasattr(self, 'IDXONE') and \
1158                hasattr(self, 'IDXCHECK'):
1159                 self.fail("Should have failed as un-indexed search")
1160
1161             self.assertEqual(len(res11), 9)
1162
1163         except ldb.LdbError as err:
1164             enum = err.args[0]
1165             estr = err.args[1]
1166             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1167             self.assertIn(estr, "ldb FULL SEARCH disabled")
1168
1169     def test_one_unindexable_presence(self):
1170         """Testing a search"""
1171
1172         try:
1173             res11 = self.l.search(base="DC=samba,DC=org",
1174                                   scope=ldb.SCOPE_ONELEVEL,
1175                                   expression="(y=*)")
1176             if hasattr(self, 'IDX') and \
1177                not hasattr(self, 'IDXONE') and \
1178                hasattr(self, 'IDXCHECK'):
1179                 self.fail("Should have failed as un-indexed search")
1180
1181             self.assertEqual(len(res11), 24)
1182
1183         except ldb.LdbError as err:
1184             enum = err.args[0]
1185             estr = err.args[1]
1186             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1187             self.assertIn(estr, "ldb FULL SEARCH disabled")
1188
1189     def test_subtree_and_or(self):
1190         """Testing a search"""
1191
1192         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1193                               scope=ldb.SCOPE_SUBTREE,
1194                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1195         self.assertEqual(len(res11), 0)
1196
1197     def test_subtree_and_or2(self):
1198         """Testing a search"""
1199
1200         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1201                               scope=ldb.SCOPE_SUBTREE,
1202                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1203         self.assertEqual(len(res11), 0)
1204
1205     def test_subtree_and_or3(self):
1206         """Testing a search"""
1207
1208         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1209                               scope=ldb.SCOPE_SUBTREE,
1210                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1211         self.assertEqual(len(res11), 2)
1212
1213     def test_subtree_and_or4(self):
1214         """Testing a search"""
1215
1216         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1217                               scope=ldb.SCOPE_SUBTREE,
1218                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1219         self.assertEqual(len(res11), 2)
1220
1221     def test_subtree_and_or5(self):
1222         """Testing a search"""
1223
1224         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1225                               scope=ldb.SCOPE_SUBTREE,
1226                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1227         self.assertEqual(len(res11), 1)
1228
1229     def test_subtree_or_and(self):
1230         """Testing a search"""
1231
1232         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1233                               scope=ldb.SCOPE_SUBTREE,
1234                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1235         self.assertEqual(len(res11), 10)
1236
1237     def test_subtree_large_and_unique(self):
1238         """Testing a search"""
1239
1240         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1241                               scope=ldb.SCOPE_SUBTREE,
1242                               expression="(&(ou=ou10)(y=a))")
1243         self.assertEqual(len(res11), 1)
1244
1245     def test_subtree_unique(self):
1246         """Testing a search"""
1247
1248         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1249                               scope=ldb.SCOPE_SUBTREE,
1250                               expression="(ou=ou10)")
1251         self.assertEqual(len(res11), 1)
1252
1253     def test_subtree_unique_elsewhere(self):
1254         """Testing a search"""
1255
1256         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1257                               scope=ldb.SCOPE_SUBTREE,
1258                               expression="(ou=ou10)")
1259         self.assertEqual(len(res11), 0)
1260
1261     def test_subtree_unique_elsewhere2(self):
1262         """Testing a search"""
1263
1264         res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1265                               scope=ldb.SCOPE_SUBTREE,
1266                               expression="(ou=unique)")
1267         self.assertEqual(len(res11), 1)
1268
1269     def test_subtree_unique_elsewhere3(self):
1270         """Testing a search"""
1271
1272         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1273                               scope=ldb.SCOPE_SUBTREE,
1274                               expression="(ou=unique)")
1275         self.assertEqual(len(res11), 0)
1276
1277     def test_subtree_unique_elsewhere4(self):
1278         """Testing a search"""
1279
1280         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1281                               scope=ldb.SCOPE_SUBTREE,
1282                               expression="(ou=unique)")
1283         self.assertEqual(len(res11), 0)
1284
1285     def test_subtree_unique_elsewhere5(self):
1286         """Testing a search"""
1287
1288         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1289                               scope=ldb.SCOPE_SUBTREE,
1290                               expression="(ou=unique)")
1291         self.assertEqual(len(res11), 0)
1292
1293     def test_subtree_unique_elsewhere6(self):
1294         """Testing a search"""
1295
1296         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1297                               scope=ldb.SCOPE_SUBTREE,
1298                               expression="(ou=unique)")
1299         self.assertEqual(len(res11), 0)
1300
1301     def test_subtree_unique_elsewhere7(self):
1302         """Testing a search"""
1303
1304         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1305                               scope=ldb.SCOPE_SUBTREE,
1306                               expression="(ou=ou10)")
1307         self.assertEqual(len(res11), 0)
1308
1309     def test_subtree_unique_here(self):
1310         """Testing a search"""
1311
1312         res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1313                               scope=ldb.SCOPE_SUBTREE,
1314                               expression="(ou=unique)")
1315         self.assertEqual(len(res11), 1)
1316
1317     def test_subtree_and_none(self):
1318         """Testing a search"""
1319
1320         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1321                               scope=ldb.SCOPE_SUBTREE,
1322                               expression="(&(ou=ouX)(y=a))")
1323         self.assertEqual(len(res11), 0)
1324
1325     def test_subtree_and_idx_record(self):
1326         """Testing a search against the index record"""
1327
1328         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1329                               scope=ldb.SCOPE_SUBTREE,
1330                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1331         self.assertEqual(len(res11), 0)
1332
1333     def test_subtree_and_idxone_record(self):
1334         """Testing a search against the index record"""
1335
1336         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1337                               scope=ldb.SCOPE_SUBTREE,
1338                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1339         self.assertEqual(len(res11), 0)
1340
1341     def test_onelevel(self):
1342         """Testing a search"""
1343
1344         try:
1345             res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1346                                   scope=ldb.SCOPE_ONELEVEL)
1347             if hasattr(self, 'IDXCHECK') \
1348                and not hasattr(self, 'IDXONE'):
1349                 self.fail()
1350         except ldb.LdbError as err:
1351             enum = err.args[0]
1352             estr = err.args[1]
1353             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1354             self.assertIn(estr, "ldb FULL SEARCH disabled")
1355         else:
1356             self.assertEqual(len(res11), 24)
1357
1358     def test_onelevel2(self):
1359         """Testing a search"""
1360
1361         try:
1362             res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1363                                   scope=ldb.SCOPE_ONELEVEL)
1364             if hasattr(self, 'IDXCHECK') \
1365                and not hasattr(self, 'IDXONE'):
1366                 self.fail()
1367                 self.fail()
1368         except ldb.LdbError as err:
1369             enum = err.args[0]
1370             estr = err.args[1]
1371             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1372             self.assertIn(estr, "ldb FULL SEARCH disabled")
1373         else:
1374             self.assertEqual(len(res11), 9)
1375
1376     def test_onelevel_and_or(self):
1377         """Testing a search"""
1378
1379         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1380                               scope=ldb.SCOPE_ONELEVEL,
1381                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1382         self.assertEqual(len(res11), 0)
1383
1384     def test_onelevel_and_or2(self):
1385         """Testing a search"""
1386
1387         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1388                               scope=ldb.SCOPE_ONELEVEL,
1389                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1390         self.assertEqual(len(res11), 0)
1391
1392     def test_onelevel_and_or3(self):
1393         """Testing a search"""
1394
1395         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1396                               scope=ldb.SCOPE_ONELEVEL,
1397                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1398         self.assertEqual(len(res11), 2)
1399
1400     def test_onelevel_and_or4(self):
1401         """Testing a search"""
1402
1403         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1404                               scope=ldb.SCOPE_ONELEVEL,
1405                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1406         self.assertEqual(len(res11), 2)
1407
1408     def test_onelevel_and_or5(self):
1409         """Testing a search"""
1410
1411         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1412                               scope=ldb.SCOPE_ONELEVEL,
1413                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1414         self.assertEqual(len(res11), 1)
1415
1416     def test_onelevel_or_and(self):
1417         """Testing a search"""
1418
1419         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1420                               scope=ldb.SCOPE_ONELEVEL,
1421                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1422         self.assertEqual(len(res11), 10)
1423
1424     def test_onelevel_large_and_unique(self):
1425         """Testing a search"""
1426
1427         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1428                               scope=ldb.SCOPE_ONELEVEL,
1429                               expression="(&(ou=ou10)(y=a))")
1430         self.assertEqual(len(res11), 1)
1431
1432     def test_onelevel_unique(self):
1433         """Testing a search"""
1434
1435         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1436                               scope=ldb.SCOPE_ONELEVEL,
1437                               expression="(ou=ou10)")
1438         self.assertEqual(len(res11), 1)
1439
1440     def test_onelevel_unique_elsewhere(self):
1441         """Testing a search"""
1442
1443         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1444                               scope=ldb.SCOPE_ONELEVEL,
1445                               expression="(ou=ou10)")
1446         self.assertEqual(len(res11), 0)
1447
1448     def test_onelevel_unique_elsewhere2(self):
1449         """Testing a search (showing that onelevel is not subtree)"""
1450
1451         res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1452                               scope=ldb.SCOPE_ONELEVEL,
1453                               expression="(ou=unique)")
1454         self.assertEqual(len(res11), 1)
1455
1456     def test_onelevel_unique_elsewhere3(self):
1457         """Testing a search (showing that onelevel is not subtree)"""
1458
1459         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1460                               scope=ldb.SCOPE_ONELEVEL,
1461                               expression="(ou=unique)")
1462         self.assertEqual(len(res11), 0)
1463
1464     def test_onelevel_unique_elsewhere4(self):
1465         """Testing a search (showing that onelevel is not subtree)"""
1466
1467         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1468                               scope=ldb.SCOPE_ONELEVEL,
1469                               expression="(ou=unique)")
1470         self.assertEqual(len(res11), 0)
1471
1472     def test_onelevel_unique_elsewhere5(self):
1473         """Testing a search (showing that onelevel is not subtree)"""
1474
1475         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1476                               scope=ldb.SCOPE_ONELEVEL,
1477                               expression="(ou=unique)")
1478         self.assertEqual(len(res11), 0)
1479
1480     def test_onelevel_unique_elsewhere6(self):
1481         """Testing a search"""
1482
1483         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1484                               scope=ldb.SCOPE_ONELEVEL,
1485                               expression="(ou=ou10)")
1486         self.assertEqual(len(res11), 0)
1487
1488     def test_onelevel_unique_here(self):
1489         """Testing a search"""
1490
1491         res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1492                               scope=ldb.SCOPE_ONELEVEL,
1493                               expression="(ou=unique)")
1494         self.assertEqual(len(res11), 0)
1495
1496     def test_onelevel_and_none(self):
1497         """Testing a search"""
1498
1499         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1500                               scope=ldb.SCOPE_ONELEVEL,
1501                               expression="(&(ou=ouX)(y=a))")
1502         self.assertEqual(len(res11), 0)
1503
1504     def test_onelevel_and_idx_record(self):
1505         """Testing a search against the index record"""
1506
1507         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1508                               scope=ldb.SCOPE_ONELEVEL,
1509                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1510         self.assertEqual(len(res11), 0)
1511
1512     def test_onelevel_and_idxone_record(self):
1513         """Testing a search against the index record"""
1514
1515         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1516                               scope=ldb.SCOPE_ONELEVEL,
1517                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1518         self.assertEqual(len(res11), 0)
1519
1520     def test_subtree_unindexable(self):
1521         """Testing a search"""
1522
1523         try:
1524             res11 = self.l.search(base="DC=samba,DC=org",
1525                                   scope=ldb.SCOPE_SUBTREE,
1526                                   expression="(y=b*)")
1527             if hasattr(self, 'IDX') and \
1528                hasattr(self, 'IDXCHECK'):
1529                 self.fail("Should have failed as un-indexed search")
1530
1531             self.assertEqual(len(res11), 9)
1532
1533         except ldb.LdbError as err:
1534             enum = err.args[0]
1535             estr = err.args[1]
1536             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1537             self.assertIn(estr, "ldb FULL SEARCH disabled")
1538
1539     def test_onelevel_only_and_or(self):
1540         """Testing a search (showing that onelevel is not subtree)"""
1541
1542         res11 = self.l.search(base="DC=ORG",
1543                               scope=ldb.SCOPE_ONELEVEL,
1544                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1545         self.assertEqual(len(res11), 0)
1546
1547     def test_onelevel_only_and_or2(self):
1548         """Testing a search (showing that onelevel is not subtree)"""
1549
1550         res11 = self.l.search(base="DC=ORG",
1551                               scope=ldb.SCOPE_ONELEVEL,
1552                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1553         self.assertEqual(len(res11), 0)
1554
1555     def test_onelevel_only_and_or3(self):
1556         """Testing a search (showing that onelevel is not subtree)"""
1557
1558         res11 = self.l.search(base="DC=ORG",
1559                               scope=ldb.SCOPE_ONELEVEL,
1560                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1561         self.assertEqual(len(res11), 0)
1562
1563     def test_onelevel_only_and_or4(self):
1564         """Testing a search (showing that onelevel is not subtree)"""
1565
1566         res11 = self.l.search(base="DC=ORG",
1567                               scope=ldb.SCOPE_ONELEVEL,
1568                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1569         self.assertEqual(len(res11), 0)
1570
1571     def test_onelevel_only_and_or5(self):
1572         """Testing a search (showing that onelevel is not subtree)"""
1573
1574         res11 = self.l.search(base="DC=ORG",
1575                               scope=ldb.SCOPE_ONELEVEL,
1576                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1577         self.assertEqual(len(res11), 0)
1578
1579     def test_onelevel_only_or_and(self):
1580         """Testing a search (showing that onelevel is not subtree)"""
1581
1582         res11 = self.l.search(base="DC=ORG",
1583                               scope=ldb.SCOPE_ONELEVEL,
1584                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1585         self.assertEqual(len(res11), 0)
1586
1587     def test_onelevel_only_large_and_unique(self):
1588         """Testing a search (showing that onelevel is not subtree)"""
1589
1590         res11 = self.l.search(base="DC=ORG",
1591                               scope=ldb.SCOPE_ONELEVEL,
1592                               expression="(&(ou=ou10)(y=a))")
1593         self.assertEqual(len(res11), 0)
1594
1595     def test_onelevel_only_unique(self):
1596         """Testing a search (showing that onelevel is not subtree)"""
1597
1598         res11 = self.l.search(base="DC=ORG",
1599                               scope=ldb.SCOPE_ONELEVEL,
1600                               expression="(ou=ou10)")
1601         self.assertEqual(len(res11), 0)
1602
1603     def test_onelevel_only_unique2(self):
1604         """Testing a search"""
1605
1606         res11 = self.l.search(base="DC=ORG",
1607                               scope=ldb.SCOPE_ONELEVEL,
1608                               expression="(ou=unique)")
1609         self.assertEqual(len(res11), 0)
1610
1611     def test_onelevel_only_and_none(self):
1612         """Testing a search (showing that onelevel is not subtree)"""
1613
1614         res11 = self.l.search(base="DC=ORG",
1615                               scope=ldb.SCOPE_ONELEVEL,
1616                               expression="(&(ou=ouX)(y=a))")
1617         self.assertEqual(len(res11), 0)
1618
1619     def test_onelevel_small_and_or(self):
1620         """Testing a search (showing that onelevel is not subtree)"""
1621
1622         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1623                               scope=ldb.SCOPE_ONELEVEL,
1624                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1625         self.assertEqual(len(res11), 0)
1626
1627     def test_onelevel_small_and_or2(self):
1628         """Testing a search (showing that onelevel is not subtree)"""
1629
1630         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1631                               scope=ldb.SCOPE_ONELEVEL,
1632                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1633         self.assertEqual(len(res11), 0)
1634
1635     def test_onelevel_small_and_or3(self):
1636         """Testing a search (showing that onelevel is not subtree)"""
1637
1638         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1639                               scope=ldb.SCOPE_ONELEVEL,
1640                               expression="(&(|(ou=ou1)(ou=ou2))(|(x=y)(y=b)(y=c)))")
1641         self.assertEqual(len(res11), 2)
1642
1643     def test_onelevel_small_and_or4(self):
1644         """Testing a search (showing that onelevel is not subtree)"""
1645
1646         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1647                               scope=ldb.SCOPE_ONELEVEL,
1648                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou1)(ou=ou2)))")
1649         self.assertEqual(len(res11), 2)
1650
1651     def test_onelevel_small_and_or5(self):
1652         """Testing a search (showing that onelevel is not subtree)"""
1653
1654         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1655                               scope=ldb.SCOPE_ONELEVEL,
1656                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou1))")
1657         self.assertEqual(len(res11), 1)
1658
1659     def test_onelevel_small_or_and(self):
1660         """Testing a search (showing that onelevel is not subtree)"""
1661
1662         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1663                               scope=ldb.SCOPE_ONELEVEL,
1664                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1665         self.assertEqual(len(res11), 2)
1666
1667     def test_onelevel_small_large_and_unique(self):
1668         """Testing a search (showing that onelevel is not subtree)"""
1669
1670         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1671                               scope=ldb.SCOPE_ONELEVEL,
1672                               expression="(&(ou=ou9)(y=a))")
1673         self.assertEqual(len(res11), 1)
1674
1675     def test_onelevel_small_unique_elsewhere(self):
1676         """Testing a search (showing that onelevel is not subtree)"""
1677
1678         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1679                               scope=ldb.SCOPE_ONELEVEL,
1680                               expression="(ou=ou10)")
1681         self.assertEqual(len(res11), 0)
1682
1683     def test_onelevel_small_and_none(self):
1684         """Testing a search (showing that onelevel is not subtree)"""
1685
1686         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1687                               scope=ldb.SCOPE_ONELEVEL,
1688                               expression="(&(ou=ouX)(y=a))")
1689         self.assertEqual(len(res11), 0)
1690
1691     def test_subtree_unindexable_presence(self):
1692         """Testing a search"""
1693
1694         try:
1695             res11 = self.l.search(base="DC=samba,DC=org",
1696                                   scope=ldb.SCOPE_SUBTREE,
1697                                   expression="(y=*)")
1698             if hasattr(self, 'IDX') and \
1699                hasattr(self, 'IDXCHECK'):
1700                 self.fail("Should have failed as un-indexed search")
1701
1702             self.assertEqual(len(res11), 24)
1703
1704         except ldb.LdbError as err:
1705             enum = err.args[0]
1706             estr = err.args[1]
1707             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1708             self.assertIn(estr, "ldb FULL SEARCH disabled")
1709
1710     def test_dn_filter_one(self):
1711         """Testing that a dn= filter succeeds
1712         (or fails with disallowDNFilter
1713         set and IDXGUID or (IDX and not IDXONE) mode)
1714         when the scope is SCOPE_ONELEVEL.
1715
1716         This should be made more consistent, but for now lock in
1717         the behaviour
1718
1719         """
1720
1721         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1722                               scope=ldb.SCOPE_ONELEVEL,
1723                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1724         if hasattr(self, 'disallowDNFilter') and \
1725            hasattr(self, 'IDX') and \
1726            (hasattr(self, 'IDXGUID') or
1727             ((not hasattr(self, 'IDXONE') and hasattr(self, 'IDX')))):
1728             self.assertEqual(len(res11), 0)
1729         else:
1730             self.assertEqual(len(res11), 1)
1731
1732     def test_dn_filter_subtree(self):
1733         """Testing that a dn= filter succeeds
1734         (or fails with disallowDNFilter set)
1735         when the scope is SCOPE_SUBTREE"""
1736
1737         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1738                               scope=ldb.SCOPE_SUBTREE,
1739                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1740         if hasattr(self, 'disallowDNFilter') \
1741            and hasattr(self, 'IDX'):
1742             self.assertEqual(len(res11), 0)
1743         else:
1744             self.assertEqual(len(res11), 1)
1745
1746     def test_dn_filter_base(self):
1747         """Testing that (incorrectly) a dn= filter works
1748         when the scope is SCOPE_BASE"""
1749
1750         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1751                               scope=ldb.SCOPE_BASE,
1752                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1753
1754         # At some point we should fix this, but it isn't trivial
1755         self.assertEqual(len(res11), 1)
1756
1757     def test_distinguishedName_filter_one(self):
1758         """Testing that a distinguishedName= filter succeeds
1759         when the scope is SCOPE_ONELEVEL.
1760
1761         This should be made more consistent, but for now lock in
1762         the behaviour
1763
1764         """
1765
1766         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1767                               scope=ldb.SCOPE_ONELEVEL,
1768                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1769         self.assertEqual(len(res11), 1)
1770
1771     def test_distinguishedName_filter_subtree(self):
1772         """Testing that a distinguishedName= filter succeeds
1773         when the scope is SCOPE_SUBTREE"""
1774
1775         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1776                               scope=ldb.SCOPE_SUBTREE,
1777                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1778         self.assertEqual(len(res11), 1)
1779
1780     def test_distinguishedName_filter_base(self):
1781         """Testing that (incorrectly) a distinguishedName= filter works
1782         when the scope is SCOPE_BASE"""
1783
1784         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1785                               scope=ldb.SCOPE_BASE,
1786                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1787
1788         # At some point we should fix this, but it isn't trivial
1789         self.assertEqual(len(res11), 1)
1790
1791     def test_bad_dn_filter_base(self):
1792         """Testing that a dn= filter on an invalid DN works
1793         when the scope is SCOPE_BASE but
1794         returns zero results"""
1795
1796         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1797                               scope=ldb.SCOPE_BASE,
1798                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1799
1800         # At some point we should fix this, but it isn't trivial
1801         self.assertEqual(len(res11), 0)
1802
1803
1804     def test_bad_dn_filter_one(self):
1805         """Testing that a dn= filter succeeds but returns zero
1806         results when the DN is not valid on a SCOPE_ONELEVEL search
1807
1808         """
1809
1810         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1811                               scope=ldb.SCOPE_ONELEVEL,
1812                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1813         self.assertEqual(len(res11), 0)
1814
1815     def test_bad_dn_filter_subtree(self):
1816         """Testing that a dn= filter succeeds but returns zero
1817         results when the DN is not valid on a SCOPE_SUBTREE search
1818
1819         """
1820
1821         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1822                               scope=ldb.SCOPE_SUBTREE,
1823                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1824         self.assertEqual(len(res11), 0)
1825
1826     def test_bad_distinguishedName_filter_base(self):
1827         """Testing that a distinguishedName= filter on an invalid DN works
1828         when the scope is SCOPE_BASE but
1829         returns zero results"""
1830
1831         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1832                               scope=ldb.SCOPE_BASE,
1833                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1834
1835         # At some point we should fix this, but it isn't trivial
1836         self.assertEqual(len(res11), 0)
1837
1838
1839     def test_bad_distinguishedName_filter_one(self):
1840         """Testing that a distinguishedName= filter succeeds but returns zero
1841         results when the DN is not valid on a SCOPE_ONELEVEL search
1842
1843         """
1844
1845         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1846                               scope=ldb.SCOPE_ONELEVEL,
1847                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1848         self.assertEqual(len(res11), 0)
1849
1850     def test_bad_distinguishedName_filter_subtree(self):
1851         """Testing that a distinguishedName= filter succeeds but returns zero
1852         results when the DN is not valid on a SCOPE_SUBTREE search
1853
1854         """
1855
1856         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1857                               scope=ldb.SCOPE_SUBTREE,
1858                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1859         self.assertEqual(len(res11), 0)
1860
1861     def test_bad_dn_search_base(self):
1862         """Testing with a bad base DN (SCOPE_BASE)"""
1863
1864         try:
1865             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1866                                   scope=ldb.SCOPE_BASE)
1867             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1868         except ldb.LdbError as err:
1869             enum = err.args[0]
1870             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1871
1872
1873     def test_bad_dn_search_one(self):
1874         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1875
1876         try:
1877             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1878                                   scope=ldb.SCOPE_ONELEVEL)
1879             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1880         except ldb.LdbError as err:
1881             enum = err.args[0]
1882             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1883
1884     def test_bad_dn_search_subtree(self):
1885         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1886
1887         try:
1888             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1889                                   scope=ldb.SCOPE_SUBTREE)
1890             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1891         except ldb.LdbError as err:
1892             enum = err.args[0]
1893             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1894
1895
1896
1897 # Run the search tests against an lmdb backend
1898 class SearchTestsLmdb(SearchTests):
1899
1900     def setUp(self):
1901         if os.environ.get('HAVE_LMDB', '1') == '0':
1902             self.skipTest("No lmdb backend")
1903         self.prefix = MDB_PREFIX
1904         self.index = MDB_INDEX_OBJ
1905         super(SearchTestsLmdb, self).setUp()
1906
1907     def tearDown(self):
1908         super(SearchTestsLmdb, self).tearDown()
1909
1910
1911 class IndexedSearchTests(SearchTests):
1912     """Test searches using the index, to ensure the index doesn't
1913        break things"""
1914
1915     def setUp(self):
1916         super(IndexedSearchTests, self).setUp()
1917         self.l.add({"dn": "@INDEXLIST",
1918                     "@IDXATTR": [b"x", b"y", b"ou"]})
1919         self.IDX = True
1920
1921
1922 class IndexedCheckSearchTests(IndexedSearchTests):
1923     """Test searches using the index, to ensure the index doesn't
1924        break things (full scan disabled)"""
1925
1926     def setUp(self):
1927         self.IDXCHECK = True
1928         super(IndexedCheckSearchTests, self).setUp()
1929
1930
1931 class IndexedSearchDnFilterTests(SearchTests):
1932     """Test searches using the index, to ensure the index doesn't
1933        break things"""
1934
1935     def setUp(self):
1936         super(IndexedSearchDnFilterTests, self).setUp()
1937         self.l.add({"dn": "@OPTIONS",
1938                     "disallowDNFilter": "TRUE"})
1939         self.disallowDNFilter = True
1940
1941         self.l.add({"dn": "@INDEXLIST",
1942                     "@IDXATTR": [b"x", b"y", b"ou"]})
1943         self.IDX = True
1944
1945
1946 class IndexedAndOneLevelSearchTests(SearchTests):
1947     """Test searches using the index including @IDXONE, to ensure
1948        the index doesn't break things"""
1949
1950     def setUp(self):
1951         super(IndexedAndOneLevelSearchTests, self).setUp()
1952         self.l.add({"dn": "@INDEXLIST",
1953                     "@IDXATTR": [b"x", b"y", b"ou"],
1954                     "@IDXONE": [b"1"]})
1955         self.IDX = True
1956         self.IDXONE = True
1957
1958
1959 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1960     """Test searches using the index including @IDXONE, to ensure
1961        the index doesn't break things (full scan disabled)"""
1962
1963     def setUp(self):
1964         self.IDXCHECK = True
1965         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1966
1967
1968 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1969     """Test searches using the index including @IDXONE, to ensure
1970        the index doesn't break things"""
1971
1972     def setUp(self):
1973         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1974         self.l.add({"dn": "@OPTIONS",
1975                     "disallowDNFilter": "TRUE",
1976                     "checkBaseOnSearch": "TRUE"})
1977         self.disallowDNFilter = True
1978         self.checkBaseOnSearch = True
1979
1980         self.l.add({"dn": "@INDEXLIST",
1981                     "@IDXATTR": [b"x", b"y", b"ou"],
1982                     "@IDXONE": [b"1"]})
1983         self.IDX = True
1984         self.IDXONE = True
1985
1986
1987 class GUIDIndexedSearchTests(SearchTests):
1988     """Test searches using the index, to ensure the index doesn't
1989        break things"""
1990
1991     def setUp(self):
1992         self.index = {"dn": "@INDEXLIST",
1993                       "@IDXATTR": [b"x", b"y", b"ou"],
1994                       "@IDXGUID": [b"objectUUID"],
1995                       "@IDX_DN_GUID": [b"GUID"]}
1996         super(GUIDIndexedSearchTests, self).setUp()
1997
1998         self.IDXGUID = True
1999
2000
2001 class GUIDIndexedDNFilterSearchTests(SearchTests):
2002     """Test searches using the index, to ensure the index doesn't
2003        break things"""
2004
2005     def setUp(self):
2006         self.index = {"dn": "@INDEXLIST",
2007                       "@IDXATTR": [b"x", b"y", b"ou"],
2008                       "@IDXGUID": [b"objectUUID"],
2009                       "@IDX_DN_GUID": [b"GUID"]}
2010         super(GUIDIndexedDNFilterSearchTests, self).setUp()
2011         self.l.add({"dn": "@OPTIONS",
2012                     "disallowDNFilter": "TRUE",
2013                     "checkBaseOnSearch": "TRUE"})
2014         self.disallowDNFilter = True
2015         self.checkBaseOnSearch = True
2016         self.IDX = True
2017         self.IDXGUID = True
2018
2019
2020 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
2021     """Test searches using the index including @IDXONE, to ensure
2022        the index doesn't break things"""
2023
2024     def setUp(self):
2025         self.index = {"dn": "@INDEXLIST",
2026                       "@IDXATTR": [b"x", b"y", b"ou"],
2027                       "@IDXONE": [b"1"],
2028                       "@IDXGUID": [b"objectUUID"],
2029                       "@IDX_DN_GUID": [b"GUID"]}
2030         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
2031         self.l.add({"dn": "@OPTIONS",
2032                     "disallowDNFilter": "TRUE",
2033                     "checkBaseOnSearch": "TRUE"})
2034         self.disallowDNFilter = True
2035         self.checkBaseOnSearch = True
2036         self.IDX = True
2037         self.IDXGUID = True
2038         self.IDXONE = True
2039
2040
2041 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
2042
2043     def setUp(self):
2044         if os.environ.get('HAVE_LMDB', '1') == '0':
2045             self.skipTest("No lmdb backend")
2046         self.prefix = MDB_PREFIX
2047         super(GUIDIndexedSearchTestsLmdb, self).setUp()
2048
2049     def tearDown(self):
2050         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
2051
2052
2053 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
2054
2055     def setUp(self):
2056         if os.environ.get('HAVE_LMDB', '1') == '0':
2057             self.skipTest("No lmdb backend")
2058         self.prefix = MDB_PREFIX
2059         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
2060
2061     def tearDown(self):
2062         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
2063
2064
2065 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
2066
2067     def setUp(self):
2068         if os.environ.get('HAVE_LMDB', '1') == '0':
2069             self.skipTest("No lmdb backend")
2070         self.prefix = MDB_PREFIX
2071         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
2072
2073     def tearDown(self):
2074         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
2075
2076
2077 class AddModifyTests(LdbBaseTest):
2078     def tearDown(self):
2079         shutil.rmtree(self.testdir)
2080         super(AddModifyTests, self).tearDown()
2081
2082         # Ensure the LDB is closed now, so we close the FD
2083         del(self.l)
2084
2085     def setUp(self):
2086         super(AddModifyTests, self).setUp()
2087         self.testdir = tempdir()
2088         self.filename = os.path.join(self.testdir, "add_test.ldb")
2089         self.l = ldb.Ldb(self.url(),
2090                          flags=self.flags(),
2091                          options=["modules:rdn_name"])
2092         try:
2093             self.l.add(self.index)
2094         except AttributeError:
2095             pass
2096
2097         self.l.add({"dn": "DC=SAMBA,DC=ORG",
2098                     "name": b"samba.org",
2099                     "objectUUID": b"0123456789abcdef"})
2100         self.l.add({"dn": "@ATTRIBUTES",
2101                     "objectUUID": "UNIQUE_INDEX"})
2102
2103     def test_add_dup(self):
2104         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2105                     "name": b"Admins",
2106                     "x": "z", "y": "a",
2107                     "objectUUID": b"0123456789abcde1"})
2108         try:
2109             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2110                         "name": b"Admins",
2111                         "x": "z", "y": "a",
2112                         "objectUUID": b"0123456789abcde2"})
2113             self.fail("Should have failed adding duplicate entry")
2114         except ldb.LdbError as err:
2115             enum = err.args[0]
2116             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2117
2118     def test_add_bad(self):
2119         try:
2120             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
2121                         "name": b"Admins",
2122                         "x": "z", "y": "a",
2123                         "objectUUID": b"0123456789abcde1"})
2124             self.fail("Should have failed adding entry with invalid DN")
2125         except ldb.LdbError as err:
2126             enum = err.args[0]
2127             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2128
2129     def test_add_del_add(self):
2130         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2131                     "name": b"Admins",
2132                     "x": "z", "y": "a",
2133                     "objectUUID": b"0123456789abcde1"})
2134         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
2135         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2136                     "name": b"Admins",
2137                     "x": "z", "y": "a",
2138                     "objectUUID": b"0123456789abcde2"})
2139
2140     def test_add_move_add(self):
2141         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2142                     "name": b"Admins",
2143                     "x": "z", "y": "a",
2144                     "objectUUID": b"0123456789abcde1"})
2145         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2146                       "OU=DUP2,DC=SAMBA,DC=ORG")
2147         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2148                     "name": b"Admins",
2149                     "x": "z", "y": "a",
2150                     "objectUUID": b"0123456789abcde2"})
2151
2152     def test_add_move_fail_move_move(self):
2153         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2154                     "name": b"Admins",
2155                     "x": "z", "y": "a",
2156                     "objectUUID": b"0123456789abcde1"})
2157         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2158                     "name": b"Admins",
2159                     "x": "z", "y": "a",
2160                     "objectUUID": b"0123456789abcde2"})
2161
2162         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2163                              scope=ldb.SCOPE_SUBTREE,
2164                              expression="(objectUUID=0123456789abcde1)")
2165         self.assertEqual(len(res2), 1)
2166         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
2167
2168         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2169                              scope=ldb.SCOPE_SUBTREE,
2170                              expression="(objectUUID=0123456789abcde2)")
2171         self.assertEqual(len(res3), 1)
2172         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2173
2174         try:
2175             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2176                           "OU=DUP2,DC=SAMBA,DC=ORG")
2177             self.fail("Should have failed on duplicate DN")
2178         except ldb.LdbError as err:
2179             enum = err.args[0]
2180             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2181
2182         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2183                       "OU=DUP3,DC=SAMBA,DC=ORG")
2184
2185         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2186                       "OU=DUP2,DC=SAMBA,DC=ORG")
2187
2188         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2189                              scope=ldb.SCOPE_SUBTREE,
2190                              expression="(objectUUID=0123456789abcde1)")
2191         self.assertEqual(len(res2), 1)
2192         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2193
2194         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2195                              scope=ldb.SCOPE_SUBTREE,
2196                              expression="(objectUUID=0123456789abcde2)")
2197         self.assertEqual(len(res3), 1)
2198         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
2199
2200     def test_move_missing(self):
2201         try:
2202             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2203                           "OU=DUP2,DC=SAMBA,DC=ORG")
2204             self.fail("Should have failed on missing")
2205         except ldb.LdbError as err:
2206             enum = err.args[0]
2207             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2208
2209     def test_move_missing2(self):
2210         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2211                     "name": b"Admins",
2212                     "x": "z", "y": "a",
2213                     "objectUUID": b"0123456789abcde2"})
2214
2215         try:
2216             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2217                           "OU=DUP2,DC=SAMBA,DC=ORG")
2218             self.fail("Should have failed on missing")
2219         except ldb.LdbError as err:
2220             enum = err.args[0]
2221             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2222
2223     def test_move_bad(self):
2224         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2225                     "name": b"Admins",
2226                     "x": "z", "y": "a",
2227                     "objectUUID": b"0123456789abcde2"})
2228
2229         try:
2230             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
2231                           "OU=DUP2,DC=SAMBA,DC=ORG")
2232             self.fail("Should have failed on invalid DN")
2233         except ldb.LdbError as err:
2234             enum = err.args[0]
2235             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2236
2237     def test_move_bad2(self):
2238         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2239                     "name": b"Admins",
2240                     "x": "z", "y": "a",
2241                     "objectUUID": b"0123456789abcde2"})
2242
2243         try:
2244             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2245                           "OUXDUP2,DC=SAMBA,DC=ORG")
2246             self.fail("Should have failed on missing")
2247         except ldb.LdbError as err:
2248             enum = err.args[0]
2249             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2250
2251     def test_move_fail_move_add(self):
2252         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2253                     "name": b"Admins",
2254                     "x": "z", "y": "a",
2255                     "objectUUID": b"0123456789abcde1"})
2256         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2257                     "name": b"Admins",
2258                     "x": "z", "y": "a",
2259                     "objectUUID": b"0123456789abcde2"})
2260         try:
2261             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2262                           "OU=DUP2,DC=SAMBA,DC=ORG")
2263             self.fail("Should have failed on duplicate DN")
2264         except ldb.LdbError as err:
2265             enum = err.args[0]
2266             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2267
2268         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2269                       "OU=DUP3,DC=SAMBA,DC=ORG")
2270
2271         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2272                     "name": b"Admins",
2273                     "x": "z", "y": "a",
2274                     "objectUUID": b"0123456789abcde3"})
2275
2276
2277 class AddModifyTestsLmdb(AddModifyTests):
2278
2279     def setUp(self):
2280         if os.environ.get('HAVE_LMDB', '1') == '0':
2281             self.skipTest("No lmdb backend")
2282         self.prefix = MDB_PREFIX
2283         self.index = MDB_INDEX_OBJ
2284         super(AddModifyTestsLmdb, self).setUp()
2285
2286     def tearDown(self):
2287         super(AddModifyTestsLmdb, self).tearDown()
2288
2289
2290 class IndexedAddModifyTests(AddModifyTests):
2291     """Test searches using the index, to ensure the index doesn't
2292        break things"""
2293
2294     def setUp(self):
2295         if not hasattr(self, 'index'):
2296             self.index = {"dn": "@INDEXLIST",
2297                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID", b"z"],
2298                           "@IDXONE": [b"1"]}
2299         super(IndexedAddModifyTests, self).setUp()
2300
2301     def test_duplicate_GUID(self):
2302         try:
2303             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
2304                         "name": b"Admins",
2305                         "x": "z", "y": "a",
2306                         "objectUUID": b"0123456789abcdef"})
2307             self.fail("Should have failed adding duplicate GUID")
2308         except ldb.LdbError as err:
2309             enum = err.args[0]
2310             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2311
2312     def test_duplicate_name_dup_GUID(self):
2313         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2314                     "name": b"Admins",
2315                     "x": "z", "y": "a",
2316                     "objectUUID": b"a123456789abcdef"})
2317         try:
2318             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2319                         "name": b"Admins",
2320                         "x": "z", "y": "a",
2321                         "objectUUID": b"a123456789abcdef"})
2322             self.fail("Should have failed adding duplicate GUID")
2323         except ldb.LdbError as err:
2324             enum = err.args[0]
2325             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2326
2327     def test_duplicate_name_dup_GUID2(self):
2328         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2329                     "name": b"Admins",
2330                     "x": "z", "y": "a",
2331                     "objectUUID": b"abc3456789abcdef"})
2332         try:
2333             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2334                         "name": b"Admins",
2335                         "x": "z", "y": "a",
2336                         "objectUUID": b"aaa3456789abcdef"})
2337             self.fail("Should have failed adding duplicate DN")
2338         except ldb.LdbError as err:
2339             enum = err.args[0]
2340             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2341
2342         # Checking the GUID didn't stick in the index
2343         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2344                     "name": b"Admins",
2345                     "x": "z", "y": "a",
2346                     "objectUUID": b"aaa3456789abcdef"})
2347
2348     def test_add_dup_guid_add(self):
2349         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2350                     "name": b"Admins",
2351                     "x": "z", "y": "a",
2352                     "objectUUID": b"0123456789abcde1"})
2353         try:
2354             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2355                         "name": b"Admins",
2356                         "x": "z", "y": "a",
2357                         "objectUUID": b"0123456789abcde1"})
2358             self.fail("Should have failed on duplicate GUID")
2359
2360         except ldb.LdbError as err:
2361             enum = err.args[0]
2362             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2363
2364         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2365                     "name": b"Admins",
2366                     "x": "z", "y": "a",
2367                     "objectUUID": b"0123456789abcde2"})
2368
2369     def test_duplicate_index_values(self):
2370         self.l.add({"dn": "OU=DIV1,DC=SAMBA,DC=ORG",
2371                     "name": b"Admins",
2372                     "z": "1",
2373                     "objectUUID": b"0123456789abcdff"})
2374         self.l.add({"dn": "OU=DIV2,DC=SAMBA,DC=ORG",
2375                     "name": b"Admins",
2376                     "z": "1",
2377                     "objectUUID": b"0123456789abcdfd"})
2378
2379
2380 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
2381     """Test searches using the index, to ensure the index doesn't
2382        break things"""
2383
2384     def setUp(self):
2385         self.index = {"dn": "@INDEXLIST",
2386                       "@IDXATTR": [b"x", b"y", b"ou"],
2387                       "@IDXONE": [b"1"],
2388                       "@IDXGUID": [b"objectUUID"],
2389                       "@IDX_DN_GUID": [b"GUID"]}
2390         super(GUIDIndexedAddModifyTests, self).setUp()
2391
2392
2393 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
2394     """Test GUID index behaviour insdie the transaction"""
2395
2396     def setUp(self):
2397         super(GUIDTransIndexedAddModifyTests, self).setUp()
2398         self.l.transaction_start()
2399
2400     def tearDown(self):
2401         self.l.transaction_commit()
2402         super(GUIDTransIndexedAddModifyTests, self).tearDown()
2403
2404
2405 class TransIndexedAddModifyTests(IndexedAddModifyTests):
2406     """Test index behaviour insdie the transaction"""
2407
2408     def setUp(self):
2409         super(TransIndexedAddModifyTests, self).setUp()
2410         self.l.transaction_start()
2411
2412     def tearDown(self):
2413         self.l.transaction_commit()
2414         super(TransIndexedAddModifyTests, self).tearDown()
2415
2416
2417 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
2418
2419     def setUp(self):
2420         if os.environ.get('HAVE_LMDB', '1') == '0':
2421             self.skipTest("No lmdb backend")
2422         self.prefix = MDB_PREFIX
2423         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
2424
2425     def tearDown(self):
2426         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
2427
2428
2429 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
2430
2431     def setUp(self):
2432         if os.environ.get('HAVE_LMDB', '1') == '0':
2433             self.skipTest("No lmdb backend")
2434         self.prefix = MDB_PREFIX
2435         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
2436
2437     def tearDown(self):
2438         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
2439
2440
2441 class BadIndexTests(LdbBaseTest):
2442     def setUp(self):
2443         super(BadIndexTests, self).setUp()
2444         self.testdir = tempdir()
2445         self.filename = os.path.join(self.testdir, "test.ldb")
2446         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
2447         if hasattr(self, 'IDXGUID'):
2448             self.ldb.add({"dn": "@INDEXLIST",
2449                           "@IDXATTR": [b"x", b"y", b"ou"],
2450                           "@IDXGUID": [b"objectUUID"],
2451                           "@IDX_DN_GUID": [b"GUID"]})
2452         else:
2453             self.ldb.add({"dn": "@INDEXLIST",
2454                           "@IDXATTR": [b"x", b"y", b"ou"]})
2455
2456         super(BadIndexTests, self).setUp()
2457
2458     def test_unique(self):
2459         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2460                       "objectUUID": b"0123456789abcde1",
2461                       "y": "1"})
2462         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2463                       "objectUUID": b"0123456789abcde2",
2464                       "y": "1"})
2465         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2466                       "objectUUID": b"0123456789abcde3",
2467                       "y": "1"})
2468
2469         res = self.ldb.search(expression="(y=1)",
2470                               base="dc=samba,dc=org")
2471         self.assertEqual(len(res), 3)
2472
2473         # Now set this to unique index, but forget to check the result
2474         try:
2475             self.ldb.add({"dn": "@ATTRIBUTES",
2476                           "y": "UNIQUE_INDEX"})
2477             self.fail()
2478         except ldb.LdbError:
2479             pass
2480
2481         # We must still have a working index
2482         res = self.ldb.search(expression="(y=1)",
2483                               base="dc=samba,dc=org")
2484         self.assertEqual(len(res), 3)
2485
2486     def test_unique_transaction(self):
2487         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2488                       "objectUUID": b"0123456789abcde1",
2489                       "y": "1"})
2490         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2491                       "objectUUID": b"0123456789abcde2",
2492                       "y": "1"})
2493         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2494                       "objectUUID": b"0123456789abcde3",
2495                       "y": "1"})
2496
2497         res = self.ldb.search(expression="(y=1)",
2498                               base="dc=samba,dc=org")
2499         self.assertEqual(len(res), 3)
2500
2501         self.ldb.transaction_start()
2502
2503         # Now set this to unique index, but forget to check the result
2504         try:
2505             self.ldb.add({"dn": "@ATTRIBUTES",
2506                           "y": "UNIQUE_INDEX"})
2507         except ldb.LdbError:
2508             pass
2509
2510         try:
2511             self.ldb.transaction_commit()
2512             self.fail()
2513
2514         except ldb.LdbError as err:
2515             enum = err.args[0]
2516             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2517
2518         # We must still have a working index
2519         res = self.ldb.search(expression="(y=1)",
2520                               base="dc=samba,dc=org")
2521
2522         self.assertEqual(len(res), 3)
2523
2524     def test_casefold(self):
2525         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2526                       "objectUUID": b"0123456789abcde1",
2527                       "y": "a"})
2528         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2529                       "objectUUID": b"0123456789abcde2",
2530                       "y": "A"})
2531         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2532                       "objectUUID": b"0123456789abcde3",
2533                       "y": ["a", "A"]})
2534
2535         res = self.ldb.search(expression="(y=a)",
2536                               base="dc=samba,dc=org")
2537         self.assertEqual(len(res), 2)
2538
2539         self.ldb.add({"dn": "@ATTRIBUTES",
2540                       "y": "CASE_INSENSITIVE"})
2541
2542         # We must still have a working index
2543         res = self.ldb.search(expression="(y=a)",
2544                               base="dc=samba,dc=org")
2545
2546         if hasattr(self, 'IDXGUID'):
2547             self.assertEqual(len(res), 3)
2548         else:
2549             # We should not return this entry twice, but sadly
2550             # we have not yet fixed
2551             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2552             self.assertEqual(len(res), 4)
2553
2554     def test_casefold_transaction(self):
2555         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2556                       "objectUUID": b"0123456789abcde1",
2557                       "y": "a"})
2558         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2559                       "objectUUID": b"0123456789abcde2",
2560                       "y": "A"})
2561         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2562                       "objectUUID": b"0123456789abcde3",
2563                       "y": ["a", "A"]})
2564
2565         res = self.ldb.search(expression="(y=a)",
2566                               base="dc=samba,dc=org")
2567         self.assertEqual(len(res), 2)
2568
2569         self.ldb.transaction_start()
2570
2571         self.ldb.add({"dn": "@ATTRIBUTES",
2572                       "y": "CASE_INSENSITIVE"})
2573
2574         self.ldb.transaction_commit()
2575
2576         # We must still have a working index
2577         res = self.ldb.search(expression="(y=a)",
2578                               base="dc=samba,dc=org")
2579
2580         if hasattr(self, 'IDXGUID'):
2581             self.assertEqual(len(res), 3)
2582         else:
2583             # We should not return this entry twice, but sadly
2584             # we have not yet fixed
2585             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2586             self.assertEqual(len(res), 4)
2587
2588     def test_modify_transaction(self):
2589         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2590                       "objectUUID": b"0123456789abcde1",
2591                       "y": "2",
2592                       "z": "2"})
2593
2594         res = self.ldb.search(expression="(y=2)",
2595                               base="dc=samba,dc=org")
2596         self.assertEqual(len(res), 1)
2597
2598         self.ldb.add({"dn": "@ATTRIBUTES",
2599                       "y": "UNIQUE_INDEX"})
2600
2601         self.ldb.transaction_start()
2602
2603         m = ldb.Message()
2604         m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2605         m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2606         m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2607
2608         try:
2609             self.ldb.modify(m)
2610             self.fail()
2611
2612         except ldb.LdbError as err:
2613             enum = err.args[0]
2614             self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2615
2616         try:
2617             self.ldb.transaction_commit()
2618             # We should fail here, but we want to be sure
2619             # we fail below
2620
2621         except ldb.LdbError as err:
2622             enum = err.args[0]
2623             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2624
2625         # The index should still be pointing to x=y
2626         res = self.ldb.search(expression="(y=2)",
2627                               base="dc=samba,dc=org")
2628         self.assertEqual(len(res), 1)
2629
2630         try:
2631             self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
2632                         "objectUUID": b"0123456789abcde2",
2633                         "y": "2"})
2634             self.fail("Added unique attribute twice")
2635         except ldb.LdbError as err:
2636             enum = err.args[0]
2637             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2638
2639         res = self.ldb.search(expression="(y=2)",
2640                               base="dc=samba,dc=org")
2641         self.assertEqual(len(res), 1)
2642         self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
2643
2644     def tearDown(self):
2645         super(BadIndexTests, self).tearDown()
2646
2647
2648 class GUIDBadIndexTests(BadIndexTests):
2649     """Test Bad index things with GUID index mode"""
2650
2651     def setUp(self):
2652         self.IDXGUID = True
2653
2654         super(GUIDBadIndexTests, self).setUp()
2655
2656
2657 class GUIDBadIndexTestsLmdb(BadIndexTests):
2658
2659     def setUp(self):
2660         if os.environ.get('HAVE_LMDB', '1') == '0':
2661             self.skipTest("No lmdb backend")
2662         self.prefix = MDB_PREFIX
2663         self.index = MDB_INDEX_OBJ
2664         self.IDXGUID = True
2665         super(GUIDBadIndexTestsLmdb, self).setUp()
2666
2667     def tearDown(self):
2668         super(GUIDBadIndexTestsLmdb, self).tearDown()
2669
2670
2671 class BatchModeTests(LdbBaseTest):
2672
2673     def setUp(self):
2674         super(BatchModeTests, self).setUp()
2675         self.testdir = tempdir()
2676         self.filename = os.path.join(self.testdir, "test.ldb")
2677         self.ldb = ldb.Ldb(self.url(),
2678                            flags=self.flags(),
2679                            options=["batch_mode:1"])
2680         if hasattr(self, 'IDXGUID'):
2681             self.ldb.add({"dn": "@INDEXLIST",
2682                           "@IDXATTR": [b"x", b"y", b"ou"],
2683                           "@IDXGUID": [b"objectUUID"],
2684                           "@IDX_DN_GUID": [b"GUID"]})
2685         else:
2686             self.ldb.add({"dn": "@INDEXLIST",
2687                           "@IDXATTR": [b"x", b"y", b"ou"]})
2688
2689     def test_modify_transaction(self):
2690         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2691                       "objectUUID": b"0123456789abcde1",
2692                       "y": "2",
2693                       "z": "2"})
2694
2695         res = self.ldb.search(expression="(y=2)",
2696                               base="dc=samba,dc=org")
2697         self.assertEqual(len(res), 1)
2698
2699         self.ldb.add({"dn": "@ATTRIBUTES",
2700                       "y": "UNIQUE_INDEX"})
2701
2702         self.ldb.transaction_start()
2703
2704         m = ldb.Message()
2705         m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2706         m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2707         m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2708
2709         try:
2710             self.ldb.modify(m)
2711             self.fail()
2712
2713         except ldb.LdbError as err:
2714             enum = err.args[0]
2715             self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2716
2717         try:
2718             self.ldb.transaction_commit()
2719             self.fail("Commit should have failed as we were in batch mode")
2720         except ldb.LdbError as err:
2721             enum = err.args[0]
2722             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2723
2724     def tearDown(self):
2725         super(BatchModeTests, self).tearDown()
2726
2727
2728 class DnTests(TestCase):
2729
2730     def setUp(self):
2731         super(DnTests, self).setUp()
2732         self.ldb = ldb.Ldb()
2733
2734     def tearDown(self):
2735         super(DnTests, self).tearDown()
2736         del(self.ldb)
2737
2738     def test_set_dn_invalid(self):
2739         x = ldb.Message()
2740
2741         def assign():
2742             x.dn = "astring"
2743         self.assertRaises(TypeError, assign)
2744
2745     def test_eq(self):
2746         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2747         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2748         self.assertEqual(x, y)
2749         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2750         self.assertNotEqual(x, y)
2751
2752     def test_str(self):
2753         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2754         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2755
2756     def test_repr(self):
2757         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2758         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2759
2760     def test_get_casefold_2(self):
2761         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2762         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2763
2764     def test_validate(self):
2765         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2766         self.assertTrue(x.validate())
2767
2768     def test_parent(self):
2769         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2770         self.assertEqual("bar=bloe", x.parent().__str__())
2771
2772     def test_parent_nonexistent(self):
2773         x = ldb.Dn(self.ldb, "@BLA")
2774         self.assertEqual(None, x.parent())
2775
2776     def test_is_valid(self):
2777         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2778         self.assertTrue(x.is_valid())
2779         x = ldb.Dn(self.ldb, "")
2780         self.assertTrue(x.is_valid())
2781
2782     def test_is_special(self):
2783         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2784         self.assertFalse(x.is_special())
2785         x = ldb.Dn(self.ldb, "@FOOBAR")
2786         self.assertTrue(x.is_special())
2787
2788     def test_check_special(self):
2789         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2790         self.assertFalse(x.check_special("FOOBAR"))
2791         x = ldb.Dn(self.ldb, "@FOOBAR")
2792         self.assertTrue(x.check_special("@FOOBAR"))
2793
2794     def test_len(self):
2795         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2796         self.assertEqual(2, len(x))
2797         x = ldb.Dn(self.ldb, "dc=foo21")
2798         self.assertEqual(1, len(x))
2799
2800     def test_add_child(self):
2801         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2802         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2803         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2804
2805     def test_add_base(self):
2806         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2807         base = ldb.Dn(self.ldb, "bla=bloe")
2808         self.assertTrue(x.add_base(base))
2809         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2810
2811     def test_add_child_str(self):
2812         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2813         self.assertTrue(x.add_child("bla=bloe"))
2814         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2815
2816     def test_add_base_str(self):
2817         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2818         base = "bla=bloe"
2819         self.assertTrue(x.add_base(base))
2820         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2821
2822     def test_add(self):
2823         x = ldb.Dn(self.ldb, "dc=foo24")
2824         y = ldb.Dn(self.ldb, "bar=bla")
2825         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2826
2827     def test_remove_base_components(self):
2828         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2829         x.remove_base_components(len(x) - 1)
2830         self.assertEqual("dc=foo24", str(x))
2831
2832     def test_parse_ldif(self):
2833         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2834         msg = next(msgs)
2835         self.assertEqual("foo=bar", str(msg[1].dn))
2836         self.assertTrue(isinstance(msg[1], ldb.Message))
2837         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2838         self.assertEqual("dn: foo=bar\n\n", ldif)
2839
2840     def test_parse_ldif_more(self):
2841         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2842         msg = next(msgs)
2843         self.assertEqual("foo=bar", str(msg[1].dn))
2844         msg = next(msgs)
2845         self.assertEqual("bar=bar", str(msg[1].dn))
2846
2847     def test_print_ldif(self):
2848         ldif = '''dn: dc=foo27
2849 foo: foo
2850
2851 '''
2852         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2853         self.msg["foo"] = [b"foo"]
2854         self.assertEqual(ldif,
2855                          self.ldb.write_ldif(self.msg,
2856                                              ldb.CHANGETYPE_NONE))
2857
2858     def test_print_ldif_binary(self):
2859         # this also confirms that ldb flags are set even without a URL)
2860         self.ldb = ldb.Ldb(flags=ldb.FLG_SHOW_BINARY)
2861         ldif = '''dn: dc=foo27
2862 foo: f
2863 öö
2864
2865 '''
2866         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2867         self.msg["foo"] = ["f\nöö"]
2868         self.assertEqual(ldif,
2869                          self.ldb.write_ldif(self.msg,
2870                                              ldb.CHANGETYPE_NONE))
2871
2872
2873     def test_print_ldif_no_base64_bad(self):
2874         ldif = '''dn: dc=foo27
2875 foo: f
2876 öö
2877
2878 '''
2879         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2880         self.msg["foo"] = ["f\nöö"]
2881         self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2882         self.assertEqual(ldif,
2883                          self.ldb.write_ldif(self.msg,
2884                                              ldb.CHANGETYPE_NONE))
2885
2886     def test_print_ldif_no_base64_good(self):
2887         ldif = '''dn: dc=foo27
2888 foo: föö
2889
2890 '''
2891         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2892         self.msg["foo"] = ["föö"]
2893         self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2894         self.assertEqual(ldif,
2895                          self.ldb.write_ldif(self.msg,
2896                                              ldb.CHANGETYPE_NONE))
2897
2898     def test_canonical_string(self):
2899         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2900         self.assertEqual("/bloe/foo25", x.canonical_str())
2901
2902     def test_canonical_ex_string(self):
2903         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2904         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2905
2906     def test_ldb_is_child_of(self):
2907         """Testing ldb_dn_compare_dn"""
2908         dn1 = ldb.Dn(self.ldb, "dc=base")
2909         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2910         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2911         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2912
2913         self.assertTrue(dn1.is_child_of(dn1))
2914         self.assertTrue(dn2.is_child_of(dn1))
2915         self.assertTrue(dn4.is_child_of(dn1))
2916         self.assertTrue(dn4.is_child_of(dn3))
2917         self.assertTrue(dn4.is_child_of(dn4))
2918         self.assertFalse(dn3.is_child_of(dn2))
2919         self.assertFalse(dn1.is_child_of(dn4))
2920
2921     def test_ldb_is_child_of_str(self):
2922         """Testing ldb_dn_compare_dn"""
2923         dn1_str = "dc=base"
2924         dn2_str = "cn=foo,dc=base"
2925         dn3_str = "cn=bar,dc=base"
2926         dn4_str = "cn=baz,cn=bar,dc=base"
2927
2928         dn1 = ldb.Dn(self.ldb, dn1_str)
2929         dn2 = ldb.Dn(self.ldb, dn2_str)
2930         dn3 = ldb.Dn(self.ldb, dn3_str)
2931         dn4 = ldb.Dn(self.ldb, dn4_str)
2932
2933         self.assertTrue(dn1.is_child_of(dn1_str))
2934         self.assertTrue(dn2.is_child_of(dn1_str))
2935         self.assertTrue(dn4.is_child_of(dn1_str))
2936         self.assertTrue(dn4.is_child_of(dn3_str))
2937         self.assertTrue(dn4.is_child_of(dn4_str))
2938         self.assertFalse(dn3.is_child_of(dn2_str))
2939         self.assertFalse(dn1.is_child_of(dn4_str))
2940
2941     def test_get_component_name(self):
2942         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2943         self.assertEqual(dn.get_component_name(0), 'cn')
2944         self.assertEqual(dn.get_component_name(1), 'dc')
2945         self.assertEqual(dn.get_component_name(2), None)
2946         self.assertEqual(dn.get_component_name(-1), None)
2947
2948     def test_get_component_value(self):
2949         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2950         self.assertEqual(dn.get_component_value(0), 'foo')
2951         self.assertEqual(dn.get_component_value(1), 'base')
2952         self.assertEqual(dn.get_component_name(2), None)
2953         self.assertEqual(dn.get_component_name(-1), None)
2954
2955     def test_set_component(self):
2956         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2957         dn.set_component(0, 'cn', 'bar')
2958         self.assertEqual(str(dn), "cn=bar,dc=base")
2959         dn.set_component(1, 'o', 'asep')
2960         self.assertEqual(str(dn), "cn=bar,o=asep")
2961         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2962         self.assertEqual(str(dn), "cn=bar,o=asep")
2963         dn.set_component(1, 'o', 'a,b+c')
2964         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2965
2966     def test_set_component_bytes(self):
2967         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2968         dn.set_component(0, 'cn', b'bar')
2969         self.assertEqual(str(dn), "cn=bar,dc=base")
2970         dn.set_component(1, 'o', b'asep')
2971         self.assertEqual(str(dn), "cn=bar,o=asep")
2972
2973     def test_set_component_none(self):
2974         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2975         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2976
2977     def test_get_extended_component_null(self):
2978         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2979         self.assertEqual(dn.get_extended_component("TEST"), None)
2980
2981     def test_get_extended_component(self):
2982         self.ldb._register_test_extensions()
2983         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2984         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2985
2986     def test_set_extended_component(self):
2987         self.ldb._register_test_extensions()
2988         dn = ldb.Dn(self.ldb, "dc=base")
2989         dn.set_extended_component("TEST", "foo")
2990         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2991         dn.set_extended_component("TEST", b"bar")
2992         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2993
2994     def test_extended_str(self):
2995         self.ldb._register_test_extensions()
2996         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2997         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2998
2999     def test_get_rdn_name(self):
3000         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3001         self.assertEqual(dn.get_rdn_name(), 'cn')
3002
3003     def test_get_rdn_value(self):
3004         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3005         self.assertEqual(dn.get_rdn_value(), 'foo')
3006
3007     def test_get_casefold(self):
3008         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3009         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
3010
3011     def test_get_linearized(self):
3012         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3013         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
3014
3015     def test_is_null(self):
3016         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3017         self.assertFalse(dn.is_null())
3018
3019         dn = ldb.Dn(self.ldb, '')
3020         self.assertTrue(dn.is_null())
3021
3022
3023 class LdbMsgTests(TestCase):
3024
3025     def setUp(self):
3026         super(LdbMsgTests, self).setUp()
3027         self.msg = ldb.Message()
3028
3029     def test_init_dn(self):
3030         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
3031         self.assertEqual("dc=foo27", str(self.msg.dn))
3032
3033     def test_iter_items(self):
3034         self.assertEqual(0, len(self.msg.items()))
3035         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
3036         self.assertEqual(1, len(self.msg.items()))
3037
3038     def test_items(self):
3039         self.msg["foo"] = ["foo"]
3040         self.msg["bar"] = ["bar"]
3041         try:
3042             items = self.msg.items()
3043         except:
3044             self.fail()
3045         self.assertEqual([("foo", ldb.MessageElement(["foo"])),
3046                           ("bar", ldb.MessageElement(["bar"]))],
3047                          items)
3048
3049         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=test")
3050         try:
3051             items = self.msg.items()
3052         except:
3053             self.fail()
3054         self.assertEqual([("dn", ldb.Dn(ldb.Ldb(), "dc=test")),
3055                           ("foo", ldb.MessageElement(["foo"])),
3056                           ("bar", ldb.MessageElement(["bar"]))],
3057                          items)
3058
3059     def test_repr(self):
3060         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
3061         self.msg["dc"] = b"foo"
3062         self.assertIn(repr(self.msg), [
3063             "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
3064             "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
3065         ])
3066         self.assertIn(repr(self.msg.text), [
3067             "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
3068             "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
3069         ])
3070
3071     def test_len(self):
3072         self.assertEqual(0, len(self.msg))
3073
3074     def test_notpresent(self):
3075         self.assertRaises(KeyError, lambda: self.msg["foo"])
3076
3077     def test_invalid(self):
3078         try:
3079             self.assertRaises(TypeError, lambda: self.msg[42])
3080         except KeyError:
3081             self.fail()
3082
3083     def test_del(self):
3084         del self.msg["foo"]
3085
3086     def test_add(self):
3087         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
3088
3089     def test_add_text(self):
3090         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
3091
3092     def test_elements_empty(self):
3093         self.assertEqual([], self.msg.elements())
3094
3095     def test_elements(self):
3096         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3097         self.msg.add(el)
3098         self.assertEqual([el], self.msg.elements())
3099         self.assertEqual([el.text], self.msg.text.elements())
3100
3101     def test_add_value(self):
3102         self.assertEqual(0, len(self.msg))
3103         self.msg["foo"] = [b"foo"]
3104         self.assertEqual(1, len(self.msg))
3105
3106     def test_add_value_text(self):
3107         self.assertEqual(0, len(self.msg))
3108         self.msg["foo"] = ["foo"]
3109         self.assertEqual(1, len(self.msg))
3110
3111     def test_add_value_multiple(self):
3112         self.assertEqual(0, len(self.msg))
3113         self.msg["foo"] = [b"foo", b"bla"]
3114         self.assertEqual(1, len(self.msg))
3115         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
3116
3117     def test_add_value_multiple_text(self):
3118         self.assertEqual(0, len(self.msg))
3119         self.msg["foo"] = ["foo", "bla"]
3120         self.assertEqual(1, len(self.msg))
3121         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
3122
3123     def test_set_value(self):
3124         self.msg["foo"] = [b"fool"]
3125         self.assertEqual([b"fool"], list(self.msg["foo"]))
3126         self.msg["foo"] = [b"bar"]
3127         self.assertEqual([b"bar"], list(self.msg["foo"]))
3128
3129     def test_set_value_text(self):
3130         self.msg["foo"] = ["fool"]
3131         self.assertEqual(["fool"], list(self.msg.text["foo"]))
3132         self.msg["foo"] = ["bar"]
3133         self.assertEqual(["bar"], list(self.msg.text["foo"]))
3134
3135     def test_keys(self):
3136         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3137         self.msg["foo"] = [b"bla"]
3138         self.msg["bar"] = [b"bla"]
3139         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
3140
3141     def test_keys_text(self):
3142         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3143         self.msg["foo"] = ["bla"]
3144         self.msg["bar"] = ["bla"]
3145         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
3146
3147     def test_dn(self):
3148         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3149         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
3150
3151     def test_get_dn(self):
3152         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3153         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
3154
3155     def test_dn_text(self):
3156         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3157         self.assertEqual("@BASEINFO", str(self.msg.dn))
3158         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
3159
3160     def test_get_dn_text(self):
3161         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3162         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
3163         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
3164
3165     def test_get_invalid(self):
3166         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3167         self.assertRaises(TypeError, self.msg.get, 42)
3168
3169     def test_get_other(self):
3170         self.msg["foo"] = [b"bar"]
3171         self.assertEqual(b"bar", self.msg.get("foo")[0])
3172         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
3173         self.assertEqual(None, self.msg.get("foo", idx=1))
3174         self.assertEqual("", self.msg.get("foo", default='', idx=1))
3175
3176     def test_get_other_text(self):
3177         self.msg["foo"] = ["bar"]
3178         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
3179         self.assertEqual("bar", self.msg.text.get("foo")[0])
3180         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
3181         self.assertEqual(None, self.msg.get("foo", idx=1))
3182         self.assertEqual("", self.msg.get("foo", default='', idx=1))
3183
3184     def test_get_default(self):
3185         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
3186         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
3187
3188     def test_get_default_text(self):
3189         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
3190         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
3191
3192     def test_get_unknown(self):
3193         self.assertEqual(None, self.msg.get("lalalala"))
3194
3195     def test_get_unknown_text(self):
3196         self.assertEqual(None, self.msg.text.get("lalalala"))
3197
3198     def test_contains(self):
3199         self.msg['foo'] = ['bar']
3200         self.assertIn('foo', self.msg)
3201
3202         self.msg['Foo'] = ['bar']
3203         self.assertIn('Foo', self.msg)
3204
3205     def test_contains_case(self):
3206         self.msg['foo'] = ['bar']
3207         self.assertIn('Foo', self.msg)
3208
3209         self.msg['Foo'] = ['bar']
3210         self.assertIn('foo', self.msg)
3211
3212     def test_contains_dn(self):
3213         self.assertIn('dn', self.msg)
3214
3215     def test_contains_dn_case(self):
3216         self.assertIn('DN', self.msg)
3217
3218     def test_contains_invalid(self):
3219         self.assertRaises(TypeError, lambda: None in self.msg)
3220
3221     def test_msg_diff(self):
3222         l = ldb.Ldb()
3223         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
3224         msg1 = next(msgs)[1]
3225         msg2 = next(msgs)[1]
3226         msgdiff = l.msg_diff(msg1, msg2)
3227         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
3228         self.assertRaises(KeyError, lambda: msgdiff["foo"])
3229         self.assertEqual(1, len(msgdiff))
3230
3231     def test_equal_empty(self):
3232         msg1 = ldb.Message()
3233         msg2 = ldb.Message()
3234         self.assertEqual(msg1, msg2)
3235
3236     def test_equal_simplel(self):
3237         db = ldb.Ldb()
3238         msg1 = ldb.Message()
3239         msg1.dn = ldb.Dn(db, "foo=bar")
3240         msg2 = ldb.Message()
3241         msg2.dn = ldb.Dn(db, "foo=bar")
3242         self.assertEqual(msg1, msg2)
3243         msg1['foo'] = b'bar'
3244         msg2['foo'] = b'bar'
3245         self.assertEqual(msg1, msg2)
3246         msg2['foo'] = b'blie'
3247         self.assertNotEqual(msg1, msg2)
3248         msg2['foo'] = b'blie'
3249
3250     def test_from_dict(self):
3251         rec = {"dn": "dc=fromdict",
3252                "a1": [b"a1-val1", b"a1-val1"]}
3253         l = ldb.Ldb()
3254         # check different types of input Flags
3255         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3256             m = ldb.Message.from_dict(l, rec, flags)
3257             self.assertEqual(rec["a1"], list(m["a1"]))
3258             self.assertEqual(flags, m["a1"].flags())
3259         # check input params
3260         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3261         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3262         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3263         # Message.from_dict expects dictionary with 'dn'
3264         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
3265         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3266
3267     def test_from_dict_text(self):
3268         rec = {"dn": "dc=fromdict",
3269                "a1": ["a1-val1", "a1-val1"]}
3270         l = ldb.Ldb()
3271         # check different types of input Flags
3272         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3273             m = ldb.Message.from_dict(l, rec, flags)
3274             self.assertEqual(rec["a1"], list(m.text["a1"]))
3275             self.assertEqual(flags, m.text["a1"].flags())
3276         # check input params
3277         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3278         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3279         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3280         # Message.from_dict expects dictionary with 'dn'
3281         err_rec = {"a1": ["a1-val1", "a1-val1"]}
3282         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3283
3284     def test_copy_add_message_element(self):
3285         m = ldb.Message()
3286         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
3287         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
3288         mto = ldb.Message()
3289         mto["1"] = m["1"]
3290         mto["2"] = m["2"]
3291         self.assertEqual(mto["1"], m["1"])
3292         self.assertEqual(mto["2"], m["2"])
3293         mto = ldb.Message()
3294         mto.add(m["1"])
3295         mto.add(m["2"])
3296         self.assertEqual(mto["1"], m["1"])
3297         self.assertEqual(mto["2"], m["2"])
3298
3299     def test_copy_add_message_element_text(self):
3300         m = ldb.Message()
3301         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
3302         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
3303         mto = ldb.Message()
3304         mto["1"] = m["1"]
3305         mto["2"] = m["2"]
3306         self.assertEqual(mto["1"], m.text["1"])
3307         self.assertEqual(mto["2"], m.text["2"])
3308         mto = ldb.Message()
3309         mto.add(m["1"])
3310         mto.add(m["2"])
3311         self.assertEqual(mto.text["1"], m.text["1"])
3312         self.assertEqual(mto.text["2"], m.text["2"])
3313         self.assertEqual(mto["1"], m["1"])
3314         self.assertEqual(mto["2"], m["2"])
3315
3316
3317 class MessageElementTests(TestCase):
3318
3319     def test_cmp_element(self):
3320         x = ldb.MessageElement([b"foo"])
3321         y = ldb.MessageElement([b"foo"])
3322         z = ldb.MessageElement([b"bzr"])
3323         self.assertEqual(x, y)
3324         self.assertNotEqual(x, z)
3325
3326     def test_cmp_element_text(self):
3327         x = ldb.MessageElement([b"foo"])
3328         y = ldb.MessageElement(["foo"])
3329         self.assertEqual(x, y)
3330
3331     def test_create_iterable(self):
3332         x = ldb.MessageElement([b"foo"])
3333         self.assertEqual([b"foo"], list(x))
3334         self.assertEqual(["foo"], list(x.text))
3335
3336     def test_repr(self):
3337         x = ldb.MessageElement([b"foo"])
3338         self.assertEqual("MessageElement([b'foo'])", repr(x))
3339         self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
3340         x = ldb.MessageElement([b"foo", b"bla"])
3341         self.assertEqual(2, len(x))
3342         self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
3343         self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
3344
3345     def test_get_item(self):
3346         x = ldb.MessageElement([b"foo", b"bar"])
3347         self.assertEqual(b"foo", x[0])
3348         self.assertEqual(b"bar", x[1])
3349         self.assertEqual(b"bar", x[-1])
3350         self.assertRaises(IndexError, lambda: x[45])
3351
3352     def test_get_item_text(self):
3353         x = ldb.MessageElement(["foo", "bar"])
3354         self.assertEqual("foo", x.text[0])
3355         self.assertEqual("bar", x.text[1])
3356         self.assertEqual("bar", x.text[-1])
3357         self.assertRaises(IndexError, lambda: x[45])
3358
3359     def test_len(self):
3360         x = ldb.MessageElement([b"foo", b"bar"])
3361         self.assertEqual(2, len(x))
3362
3363     def test_eq(self):
3364         x = ldb.MessageElement([b"foo", b"bar"])
3365         y = ldb.MessageElement([b"foo", b"bar"])
3366         self.assertEqual(y, x)
3367         x = ldb.MessageElement([b"foo"])
3368         self.assertNotEqual(y, x)
3369         y = ldb.MessageElement([b"foo"])
3370         self.assertEqual(y, x)
3371
3372     def test_extended(self):
3373         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3374         self.assertEqual("MessageElement([b'456'])", repr(el))
3375         self.assertEqual("MessageElement([b'456']).text", repr(el.text))
3376
3377     def test_bad_text(self):
3378         el = ldb.MessageElement(b'\xba\xdd')
3379         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
3380
3381
3382 class ModuleTests(TestCase):
3383
3384     def setUp(self):
3385         super(ModuleTests, self).setUp()
3386         self.testdir = tempdir()
3387         self.filename = os.path.join(self.testdir, "test.ldb")
3388         self.ldb = ldb.Ldb(self.filename)
3389
3390     def tearDown(self):
3391         shutil.rmtree(self.testdir)
3392         super(ModuleTests, self).setUp()
3393
3394     def test_register_module(self):
3395         class ExampleModule:
3396             name = "example"
3397         ldb.register_module(ExampleModule)
3398
3399     def test_use_module(self):
3400         ops = []
3401
3402         class ExampleModule:
3403             name = "bla"
3404
3405             def __init__(self, ldb, next):
3406                 ops.append("init")
3407                 self.next = next
3408
3409             def search(self, *args, **kwargs):
3410                 return self.next.search(*args, **kwargs)
3411
3412             def request(self, *args, **kwargs):
3413                 pass
3414
3415         ldb.register_module(ExampleModule)
3416         l = ldb.Ldb(self.filename)
3417         l.add({"dn": "@MODULES", "@LIST": "bla"})
3418         self.assertEqual([], ops)
3419         l = ldb.Ldb(self.filename)
3420         self.assertEqual(["init"], ops)
3421
3422
3423 class LdbResultTests(LdbBaseTest):
3424
3425     def setUp(self):
3426         super(LdbResultTests, self).setUp()
3427         self.testdir = tempdir()
3428         self.filename = os.path.join(self.testdir, "test.ldb")
3429         self.l = ldb.Ldb(self.url(), flags=self.flags())
3430         try:
3431             self.l.add(self.index)
3432         except AttributeError:
3433             pass
3434         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
3435                     "objectUUID": b"0123456789abcde0"})
3436         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
3437                     "objectUUID": b"0123456789abcde1"})
3438         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
3439                     "objectUUID": b"0123456789abcde2"})
3440         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
3441                     "objectUUID": b"0123456789abcde3"})
3442         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
3443                     "objectUUID": b"0123456789abcde4"})
3444         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
3445                     "objectUUID": b"0123456789abcde5"})
3446         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
3447                     "objectUUID": b"0123456789abcde6"})
3448         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
3449                     "objectUUID": b"0123456789abcde7"})
3450         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
3451                     "objectUUID": b"0123456789abcde8"})
3452         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
3453                     "objectUUID": b"0123456789abcde9"})
3454         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
3455                     "objectUUID": b"0123456789abcdea"})
3456         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
3457                     "objectUUID": b"0123456789abcdeb"})
3458         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
3459                     "objectUUID": b"0123456789abcdec"})
3460
3461     def tearDown(self):
3462         shutil.rmtree(self.testdir)
3463         super(LdbResultTests, self).tearDown()
3464         # Ensure the LDB is closed now, so we close the FD
3465         del(self.l)
3466
3467     def test_return_type(self):
3468         res = self.l.search()
3469         self.assertEqual(str(res), "<ldb result>")
3470
3471     def test_get_msgs(self):
3472         res = self.l.search()
3473         list = res.msgs
3474
3475     def test_get_controls(self):
3476         res = self.l.search()
3477         list = res.controls
3478
3479     def test_get_referals(self):
3480         res = self.l.search()
3481         list = res.referals
3482
3483     def test_iter_msgs(self):
3484         found = False
3485         for l in self.l.search().msgs:
3486             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3487                 found = True
3488         self.assertTrue(found)
3489
3490     def test_iter_msgs_count(self):
3491         self.assertTrue(self.l.search().count > 0)
3492         # 13 objects has been added to the DC=SAMBA, DC=ORG
3493         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
3494
3495     def test_iter_controls(self):
3496         res = self.l.search().controls
3497         it = iter(res)
3498
3499     def test_create_control(self):
3500         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
3501         c = ldb.Control(self.l, "relax:1")
3502         self.assertEqual(c.critical, True)
3503         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
3504
3505     def test_iter_refs(self):
3506         res = self.l.search().referals
3507         it = iter(res)
3508
3509     def test_search_sequence_msgs(self):
3510         found = False
3511         res = self.l.search().msgs
3512
3513         for i in range(0, len(res)):
3514             l = res[i]
3515             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3516                 found = True
3517         self.assertTrue(found)
3518
3519     def test_search_as_iter(self):
3520         found = False
3521         res = self.l.search()
3522
3523         for l in res:
3524             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3525                 found = True
3526         self.assertTrue(found)
3527
3528     def test_search_iter(self):
3529         found = False
3530         res = self.l.search_iterator()
3531
3532         for l in res:
3533             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3534                 found = True
3535         self.assertTrue(found)
3536
3537     # Show that search results can't see into a transaction
3538
3539     def test_search_against_trans(self):
3540         found11 = False
3541
3542         (r1, w1) = os.pipe()
3543
3544         (r2, w2) = os.pipe()
3545
3546         # For the first element, fork a child that will
3547         # write to the DB
3548         pid = os.fork()
3549         if pid == 0:
3550             # In the child, re-open
3551             del(self.l)
3552             gc.collect()
3553
3554             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3555             # start a transaction
3556             child_ldb.transaction_start()
3557
3558             # write to it
3559             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3560                            "name": b"samba.org",
3561                            "objectUUID": b"o123456789acbdef"})
3562
3563             os.write(w1, b"added")
3564
3565             # Now wait for the search to be done
3566             os.read(r2, 6)
3567
3568             # and commit
3569             try:
3570                 child_ldb.transaction_commit()
3571             except ldb.LdbError as err:
3572                 # We print this here to see what went wrong in the child
3573                 print(err)
3574                 os._exit(1)
3575
3576             os.write(w1, b"transaction")
3577             os._exit(0)
3578
3579         self.assertEqual(os.read(r1, 5), b"added")
3580
3581         # This should not turn up until the transaction is concluded
3582         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3583                               scope=ldb.SCOPE_BASE)
3584         self.assertEqual(len(res11), 0)
3585
3586         os.write(w2, b"search")
3587
3588         # Now wait for the transaction to be done.  This should
3589         # deadlock, but the search doesn't hold a read lock for the
3590         # iterator lifetime currently.
3591         self.assertEqual(os.read(r1, 11), b"transaction")
3592
3593         # This should now turn up, as the transaction is over
3594         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3595                               scope=ldb.SCOPE_BASE)
3596         self.assertEqual(len(res11), 1)
3597
3598         self.assertFalse(found11)
3599
3600         (got_pid, status) = os.waitpid(pid, 0)
3601         self.assertEqual(got_pid, pid)
3602
3603     def test_search_iter_against_trans(self):
3604         found = False
3605         found11 = False
3606
3607         # We need to hold this iterator open to hold the all-record
3608         # lock
3609         res = self.l.search_iterator()
3610
3611         (r1, w1) = os.pipe()
3612
3613         (r2, w2) = os.pipe()
3614
3615         # For the first element, with the sequence open (which
3616         # means with ldb locks held), fork a child that will
3617         # write to the DB
3618         pid = os.fork()
3619         if pid == 0:
3620             # In the child, re-open
3621             del(res)
3622             del(self.l)
3623             gc.collect()
3624
3625             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3626             # start a transaction
3627             child_ldb.transaction_start()
3628
3629             # write to it
3630             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3631                            "name": b"samba.org",
3632                            "objectUUID": b"o123456789acbdef"})
3633
3634             os.write(w1, b"added")
3635
3636             # Now wait for the search to be done
3637             os.read(r2, 6)
3638
3639             # and commit
3640             try:
3641                 child_ldb.transaction_commit()
3642             except ldb.LdbError as err:
3643                 # We print this here to see what went wrong in the child
3644                 print(err)
3645                 os._exit(1)
3646
3647             os.write(w1, b"transaction")
3648             os._exit(0)
3649
3650         self.assertEqual(os.read(r1, 5), b"added")
3651
3652         # This should not turn up until the transaction is concluded
3653         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3654                               scope=ldb.SCOPE_BASE)
3655         self.assertEqual(len(res11), 0)
3656
3657         os.write(w2, b"search")
3658
3659         # allow the transaction to start
3660         time.sleep(1)
3661
3662         # This should not turn up until the search finishes and
3663         # removed the read lock, but for ldb_tdb that happened as soon
3664         # as we called the first res.next()
3665         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3666                               scope=ldb.SCOPE_BASE)
3667         self.assertEqual(len(res11), 0)
3668
3669         # These results are all collected at the first next(res) call
3670         for l in res:
3671             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3672                 found = True
3673             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
3674                 found11 = True
3675
3676         # Now wait for the transaction to be done.
3677         self.assertEqual(os.read(r1, 11), b"transaction")
3678
3679         # This should now turn up, as the transaction is over and all
3680         # read locks are gone
3681         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3682                               scope=ldb.SCOPE_BASE)
3683         self.assertEqual(len(res11), 1)
3684
3685         self.assertTrue(found)
3686         self.assertFalse(found11)
3687
3688         (got_pid, status) = os.waitpid(pid, 0)
3689         self.assertEqual(got_pid, pid)
3690
3691
3692 class LdbResultTestsLmdb(LdbResultTests):
3693
3694     def setUp(self):
3695         if os.environ.get('HAVE_LMDB', '1') == '0':
3696             self.skipTest("No lmdb backend")
3697         self.prefix = MDB_PREFIX
3698         self.index = MDB_INDEX_OBJ
3699         super(LdbResultTestsLmdb, self).setUp()
3700
3701     def tearDown(self):
3702         super(LdbResultTestsLmdb, self).tearDown()
3703
3704
3705 class BadTypeTests(TestCase):
3706     def test_control(self):
3707         l = ldb.Ldb()
3708         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
3709         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
3710
3711     def test_modify(self):
3712         l = ldb.Ldb()
3713         dn = ldb.Dn(l, 'a=b')
3714         m = ldb.Message(dn)
3715         self.assertRaises(TypeError, l.modify, '<bad type>')
3716         self.assertRaises(TypeError, l.modify, m, '<bad type>')
3717
3718     def test_add(self):
3719         l = ldb.Ldb()
3720         dn = ldb.Dn(l, 'a=b')
3721         m = ldb.Message(dn)
3722         self.assertRaises(TypeError, l.add, '<bad type>')
3723         self.assertRaises(TypeError, l.add, m, '<bad type>')
3724
3725     def test_delete(self):
3726         l = ldb.Ldb()
3727         dn = ldb.Dn(l, 'a=b')
3728         self.assertRaises(TypeError, l.add, '<bad type>')
3729         self.assertRaises(TypeError, l.add, dn, '<bad type>')
3730
3731     def test_rename(self):
3732         l = ldb.Ldb()
3733         dn = ldb.Dn(l, 'a=b')
3734         self.assertRaises(TypeError, l.add, '<bad type>', dn)
3735         self.assertRaises(TypeError, l.add, dn, '<bad type>')
3736         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
3737
3738     def test_search(self):
3739         l = ldb.Ldb()
3740         self.assertRaises(TypeError, l.search, base=1234)
3741         self.assertRaises(TypeError, l.search, scope='<bad type>')
3742         self.assertRaises(TypeError, l.search, expression=1234)
3743         self.assertRaises(TypeError, l.search, attrs='<bad type>')
3744         self.assertRaises(TypeError, l.search, controls='<bad type>')
3745
3746
3747 class VersionTests(TestCase):
3748
3749     def test_version(self):
3750         self.assertTrue(isinstance(ldb.__version__, str))
3751
3752 class NestedTransactionTests(LdbBaseTest):
3753     def setUp(self):
3754         super(NestedTransactionTests, self).setUp()
3755         self.testdir = tempdir()
3756         self.filename = os.path.join(self.testdir, "test.ldb")
3757         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
3758         self.ldb.add({"dn": "@INDEXLIST",
3759                       "@IDXATTR": [b"x", b"y", b"ou"],
3760                       "@IDXGUID": [b"objectUUID"],
3761                       "@IDX_DN_GUID": [b"GUID"]})
3762
3763         super(NestedTransactionTests, self).setUp()
3764
3765     #
3766     # This test documents that currently ldb does not support true nested
3767     # transactions.
3768     #
3769     # Note: The test is written so that it treats failure as pass.
3770     #       It is done this way as standalone ldb builds do not use the samba
3771     #       known fail mechanism
3772     #
3773     def test_nested_transactions(self):
3774
3775         self.ldb.transaction_start()
3776
3777         self.ldb.add({"dn": "x=x1,dc=samba,dc=org",
3778                       "objectUUID": b"0123456789abcde1"})
3779         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3780                               base="dc=samba,dc=org")
3781         self.assertEqual(len(res), 1)
3782
3783         self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
3784                       "objectUUID": b"0123456789abcde2"})
3785         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3786                               base="dc=samba,dc=org")
3787         self.assertEqual(len(res), 1)
3788
3789         self.ldb.transaction_start()
3790         self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
3791                       "objectUUID": b"0123456789abcde3"})
3792         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3793                               base="dc=samba,dc=org")
3794         self.assertEqual(len(res), 1)
3795         self.ldb.transaction_cancel()
3796         #
3797         # Check that we can not see the record added by the cancelled
3798         # transaction.
3799         # Currently this fails as ldb does not support true nested
3800         # transactions, and only the outer commits and cancels have an effect
3801         #
3802         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3803                               base="dc=samba,dc=org")
3804         #
3805         # FIXME this test currently passes on a failure, i.e. if nested
3806         #       transaction support worked correctly the correct test would
3807         #       be.
3808         #         self.assertEqual(len(res), 0)
3809         #       as the add of objectUUID=0123456789abcde3 would reverted when
3810         #       the sub transaction it was nested in was rolled back.
3811         #
3812         #       Currently this is not the case so the record is still present.
3813         self.assertEqual(len(res), 1)
3814
3815
3816         # Commit the outer transaction
3817         #
3818         self.ldb.transaction_commit()
3819         #
3820         # Now check we can still see the records added in the outer
3821         # transaction.
3822         #
3823         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3824                               base="dc=samba,dc=org")
3825         self.assertEqual(len(res), 1)
3826         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3827                               base="dc=samba,dc=org")
3828         self.assertEqual(len(res), 1)
3829         #
3830         # And that we can't see the records added by the nested transaction.
3831         #
3832         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3833                               base="dc=samba,dc=org")
3834         # FIXME again if nested transactions worked correctly we would not
3835         #       see this record. The test should be.
3836         #         self.assertEqual(len(res), 0)
3837         self.assertEqual(len(res), 1)
3838
3839     def tearDown(self):
3840         super(NestedTransactionTests, self).tearDown()
3841
3842
3843 class LmdbNestedTransactionTests(NestedTransactionTests):
3844
3845     def setUp(self):
3846         if os.environ.get('HAVE_LMDB', '1') == '0':
3847             self.skipTest("No lmdb backend")
3848         self.prefix = MDB_PREFIX
3849         self.index = MDB_INDEX_OBJ
3850         super(LmdbNestedTransactionTests, self).setUp()
3851
3852     def tearDown(self):
3853         super(LmdbNestedTransactionTests, self).tearDown()
3854
3855
3856 if __name__ == '__main__':
3857     import unittest
3858     unittest.TestProgram()