ldb ldb_key_value: test ldb batch
[samba.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python3
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18 MDB_INDEX_OBJ = {
19     "dn": "@INDEXLIST",
20     "@IDXONE": [b"1"],
21     "@IDXGUID": [b"objectUUID"],
22     "@IDX_DN_GUID": [b"GUID"]
23 }
24
25
26 def tempdir():
27     import tempfile
28     try:
29         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
30     except KeyError:
31         dir_prefix = None
32     return tempfile.mkdtemp(dir=dir_prefix)
33
34
35 class NoContextTests(TestCase):
36
37     def test_valid_attr_name(self):
38         self.assertTrue(ldb.valid_attr_name("foo"))
39         self.assertFalse(ldb.valid_attr_name("24foo"))
40
41     def test_timestring(self):
42         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
43         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
44
45     def test_string_to_time(self):
46         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
47         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
48
49     def test_binary_encode(self):
50         encoded = ldb.binary_encode(b'test\\x')
51         decoded = ldb.binary_decode(encoded)
52         self.assertEqual(decoded, b'test\\x')
53
54         encoded2 = ldb.binary_encode('test\\x')
55         self.assertEqual(encoded2, encoded)
56
57
58 class LdbBaseTest(TestCase):
59     def setUp(self):
60         super(LdbBaseTest, self).setUp()
61         try:
62             if self.prefix is None:
63                 self.prefix = TDB_PREFIX
64         except AttributeError:
65             self.prefix = TDB_PREFIX
66
67     def tearDown(self):
68         super(LdbBaseTest, self).tearDown()
69
70     def url(self):
71         return self.prefix + self.filename
72
73     def flags(self):
74         if self.prefix == MDB_PREFIX:
75             return ldb.FLG_NOSYNC
76         else:
77             return 0
78
79
80 class SimpleLdb(LdbBaseTest):
81
82     def setUp(self):
83         super(SimpleLdb, self).setUp()
84         self.testdir = tempdir()
85         self.filename = os.path.join(self.testdir, "test.ldb")
86         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
87         try:
88             self.ldb.add(self.index)
89         except AttributeError:
90             pass
91
92     def tearDown(self):
93         shutil.rmtree(self.testdir)
94         super(SimpleLdb, self).tearDown()
95         # Ensure the LDB is closed now, so we close the FD
96         del(self.ldb)
97
98     def test_connect(self):
99         ldb.Ldb(self.url(), flags=self.flags())
100
101     def test_connect_none(self):
102         ldb.Ldb()
103
104     def test_connect_later(self):
105         x = ldb.Ldb()
106         x.connect(self.url(), flags=self.flags())
107
108     def test_repr(self):
109         x = ldb.Ldb()
110         self.assertTrue(repr(x).startswith("<ldb connection"))
111
112     def test_set_create_perms(self):
113         x = ldb.Ldb()
114         x.set_create_perms(0o600)
115
116     def test_modules_none(self):
117         x = ldb.Ldb()
118         self.assertEqual([], x.modules())
119
120     def test_modules_tdb(self):
121         x = ldb.Ldb(self.url(), flags=self.flags())
122         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
123
124     def test_firstmodule_none(self):
125         x = ldb.Ldb()
126         self.assertEqual(x.firstmodule, None)
127
128     def test_firstmodule_tdb(self):
129         x = ldb.Ldb(self.url(), flags=self.flags())
130         mod = x.firstmodule
131         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
132
133     def test_search(self):
134         l = ldb.Ldb(self.url(), flags=self.flags())
135         self.assertEqual(len(l.search()), 0)
136
137     def test_search_controls(self):
138         l = ldb.Ldb(self.url(), flags=self.flags())
139         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
140
141     def test_utf8_ldb_Dn(self):
142         l = ldb.Ldb(self.url(), flags=self.flags())
143         dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
144
145     def test_utf8_encoded_ldb_Dn(self):
146         l = ldb.Ldb(self.url(), flags=self.flags())
147         dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
148         try:
149             dn = ldb.Dn(l, dn_encoded_utf8)
150         except UnicodeDecodeError as e:
151                 raise
152         except TypeError as te:
153             if PY3:
154                p3errors = ["argument 2 must be str, not bytes",
155                            "Can't convert 'bytes' object to str implicitly"]
156                self.assertIn(str(te), p3errors)
157             else:
158                raise
159
160     def test_search_attrs(self):
161         l = ldb.Ldb(self.url(), flags=self.flags())
162         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
163
164     def test_search_string_dn(self):
165         l = ldb.Ldb(self.url(), flags=self.flags())
166         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
167
168     def test_search_attr_string(self):
169         l = ldb.Ldb(self.url(), flags=self.flags())
170         self.assertRaises(TypeError, l.search, attrs="dc")
171         self.assertRaises(TypeError, l.search, attrs=b"dc")
172
173     def test_opaque(self):
174         l = ldb.Ldb(self.url(), flags=self.flags())
175         l.set_opaque("my_opaque", l)
176         self.assertTrue(l.get_opaque("my_opaque") is not None)
177         self.assertEqual(None, l.get_opaque("unknown"))
178
179     def test_search_scope_base_empty_db(self):
180         l = ldb.Ldb(self.url(), flags=self.flags())
181         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
182                                       ldb.SCOPE_BASE)), 0)
183
184     def test_search_scope_onelevel_empty_db(self):
185         l = ldb.Ldb(self.url(), flags=self.flags())
186         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
187                                       ldb.SCOPE_ONELEVEL)), 0)
188
189     def test_delete(self):
190         l = ldb.Ldb(self.url(), flags=self.flags())
191         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
192
193     def test_delete_w_unhandled_ctrl(self):
194         l = ldb.Ldb(self.url(), flags=self.flags())
195         m = ldb.Message()
196         m.dn = ldb.Dn(l, "dc=foo1")
197         m["b"] = [b"a"]
198         m["objectUUID"] = b"0123456789abcdef"
199         l.add(m)
200         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
201         l.delete(m.dn)
202
203     def test_contains(self):
204         name = self.url()
205         l = ldb.Ldb(name, flags=self.flags())
206         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
207         l = ldb.Ldb(name, flags=self.flags())
208         m = ldb.Message()
209         m.dn = ldb.Dn(l, "dc=foo3")
210         m["b"] = ["a"]
211         m["objectUUID"] = b"0123456789abcdef"
212         l.add(m)
213         try:
214             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
215             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
216         finally:
217             l.delete(m.dn)
218
219     def test_get_config_basedn(self):
220         l = ldb.Ldb(self.url(), flags=self.flags())
221         self.assertEqual(None, l.get_config_basedn())
222
223     def test_get_root_basedn(self):
224         l = ldb.Ldb(self.url(), flags=self.flags())
225         self.assertEqual(None, l.get_root_basedn())
226
227     def test_get_schema_basedn(self):
228         l = ldb.Ldb(self.url(), flags=self.flags())
229         self.assertEqual(None, l.get_schema_basedn())
230
231     def test_get_default_basedn(self):
232         l = ldb.Ldb(self.url(), flags=self.flags())
233         self.assertEqual(None, l.get_default_basedn())
234
235     def test_add(self):
236         l = ldb.Ldb(self.url(), flags=self.flags())
237         m = ldb.Message()
238         m.dn = ldb.Dn(l, "dc=foo4")
239         m["bla"] = b"bla"
240         m["objectUUID"] = b"0123456789abcdef"
241         self.assertEqual(len(l.search()), 0)
242         l.add(m)
243         try:
244             self.assertEqual(len(l.search()), 1)
245         finally:
246             l.delete(ldb.Dn(l, "dc=foo4"))
247
248     def test_search_iterator(self):
249         l = ldb.Ldb(self.url(), flags=self.flags())
250         s = l.search_iterator()
251         s.abandon()
252         try:
253             for me in s:
254                 self.fail()
255             self.fail()
256         except RuntimeError as re:
257             pass
258         try:
259             s.abandon()
260             self.fail()
261         except RuntimeError as re:
262             pass
263         try:
264             s.result()
265             self.fail()
266         except RuntimeError as re:
267             pass
268
269         s = l.search_iterator()
270         count = 0
271         for me in s:
272             self.assertTrue(isinstance(me, ldb.Message))
273             count += 1
274         r = s.result()
275         self.assertEqual(len(r), 0)
276         self.assertEqual(count, 0)
277
278         m1 = ldb.Message()
279         m1.dn = ldb.Dn(l, "dc=foo4")
280         m1["bla"] = b"bla"
281         m1["objectUUID"] = b"0123456789abcdef"
282         l.add(m1)
283         try:
284             s = l.search_iterator()
285             msgs = []
286             for me in s:
287                 self.assertTrue(isinstance(me, ldb.Message))
288                 count += 1
289                 msgs.append(me)
290             r = s.result()
291             self.assertEqual(len(r), 0)
292             self.assertEqual(len(msgs), 1)
293             self.assertEqual(msgs[0].dn, m1.dn)
294
295             m2 = ldb.Message()
296             m2.dn = ldb.Dn(l, "dc=foo5")
297             m2["bla"] = b"bla"
298             m2["objectUUID"] = b"0123456789abcdee"
299             l.add(m2)
300
301             s = l.search_iterator()
302             msgs = []
303             for me in s:
304                 self.assertTrue(isinstance(me, ldb.Message))
305                 count += 1
306                 msgs.append(me)
307             r = s.result()
308             self.assertEqual(len(r), 0)
309             self.assertEqual(len(msgs), 2)
310             if msgs[0].dn == m1.dn:
311                 self.assertEqual(msgs[0].dn, m1.dn)
312                 self.assertEqual(msgs[1].dn, m2.dn)
313             else:
314                 self.assertEqual(msgs[0].dn, m2.dn)
315                 self.assertEqual(msgs[1].dn, m1.dn)
316
317             s = l.search_iterator()
318             msgs = []
319             for me in s:
320                 self.assertTrue(isinstance(me, ldb.Message))
321                 count += 1
322                 msgs.append(me)
323                 break
324             try:
325                 s.result()
326                 self.fail()
327             except RuntimeError as re:
328                 pass
329             for me in s:
330                 self.assertTrue(isinstance(me, ldb.Message))
331                 count += 1
332                 msgs.append(me)
333                 break
334             for me in s:
335                 self.fail()
336
337             r = s.result()
338             self.assertEqual(len(r), 0)
339             self.assertEqual(len(msgs), 2)
340             if msgs[0].dn == m1.dn:
341                 self.assertEqual(msgs[0].dn, m1.dn)
342                 self.assertEqual(msgs[1].dn, m2.dn)
343             else:
344                 self.assertEqual(msgs[0].dn, m2.dn)
345                 self.assertEqual(msgs[1].dn, m1.dn)
346         finally:
347             l.delete(ldb.Dn(l, "dc=foo4"))
348             l.delete(ldb.Dn(l, "dc=foo5"))
349
350     def test_add_text(self):
351         l = ldb.Ldb(self.url(), flags=self.flags())
352         m = ldb.Message()
353         m.dn = ldb.Dn(l, "dc=foo4")
354         m["bla"] = "bla"
355         m["objectUUID"] = b"0123456789abcdef"
356         self.assertEqual(len(l.search()), 0)
357         l.add(m)
358         try:
359             self.assertEqual(len(l.search()), 1)
360         finally:
361             l.delete(ldb.Dn(l, "dc=foo4"))
362
363     def test_add_w_unhandled_ctrl(self):
364         l = ldb.Ldb(self.url(), flags=self.flags())
365         m = ldb.Message()
366         m.dn = ldb.Dn(l, "dc=foo4")
367         m["bla"] = b"bla"
368         self.assertEqual(len(l.search()), 0)
369         self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
370
371     def test_add_dict(self):
372         l = ldb.Ldb(self.url(), flags=self.flags())
373         m = {"dn": ldb.Dn(l, "dc=foo5"),
374              "bla": b"bla",
375              "objectUUID": b"0123456789abcdef"}
376         self.assertEqual(len(l.search()), 0)
377         l.add(m)
378         try:
379             self.assertEqual(len(l.search()), 1)
380         finally:
381             l.delete(ldb.Dn(l, "dc=foo5"))
382
383     def test_add_dict_text(self):
384         l = ldb.Ldb(self.url(), flags=self.flags())
385         m = {"dn": ldb.Dn(l, "dc=foo5"),
386              "bla": "bla",
387              "objectUUID": b"0123456789abcdef"}
388         self.assertEqual(len(l.search()), 0)
389         l.add(m)
390         try:
391             self.assertEqual(len(l.search()), 1)
392         finally:
393             l.delete(ldb.Dn(l, "dc=foo5"))
394
395     def test_add_dict_string_dn(self):
396         l = ldb.Ldb(self.url(), flags=self.flags())
397         m = {"dn": "dc=foo6", "bla": b"bla",
398              "objectUUID": b"0123456789abcdef"}
399         self.assertEqual(len(l.search()), 0)
400         l.add(m)
401         try:
402             self.assertEqual(len(l.search()), 1)
403         finally:
404             l.delete(ldb.Dn(l, "dc=foo6"))
405
406     def test_add_dict_bytes_dn(self):
407         l = ldb.Ldb(self.url(), flags=self.flags())
408         m = {"dn": b"dc=foo6", "bla": b"bla",
409              "objectUUID": b"0123456789abcdef"}
410         self.assertEqual(len(l.search()), 0)
411         l.add(m)
412         try:
413             self.assertEqual(len(l.search()), 1)
414         finally:
415             l.delete(ldb.Dn(l, "dc=foo6"))
416
417     def test_rename(self):
418         l = ldb.Ldb(self.url(), flags=self.flags())
419         m = ldb.Message()
420         m.dn = ldb.Dn(l, "dc=foo7")
421         m["bla"] = b"bla"
422         m["objectUUID"] = b"0123456789abcdef"
423         self.assertEqual(len(l.search()), 0)
424         l.add(m)
425         try:
426             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
427             self.assertEqual(len(l.search()), 1)
428         finally:
429             l.delete(ldb.Dn(l, "dc=bar"))
430
431     def test_rename_string_dns(self):
432         l = ldb.Ldb(self.url(), flags=self.flags())
433         m = ldb.Message()
434         m.dn = ldb.Dn(l, "dc=foo8")
435         m["bla"] = b"bla"
436         m["objectUUID"] = b"0123456789abcdef"
437         self.assertEqual(len(l.search()), 0)
438         l.add(m)
439         self.assertEqual(len(l.search()), 1)
440         try:
441             l.rename("dc=foo8", "dc=bar")
442             self.assertEqual(len(l.search()), 1)
443         finally:
444             l.delete(ldb.Dn(l, "dc=bar"))
445
446     def test_rename_bad_string_dns(self):
447         l = ldb.Ldb(self.url(), flags=self.flags())
448         m = ldb.Message()
449         m.dn = ldb.Dn(l, "dc=foo8")
450         m["bla"] = b"bla"
451         m["objectUUID"] = b"0123456789abcdef"
452         self.assertEqual(len(l.search()), 0)
453         l.add(m)
454         self.assertEqual(len(l.search()), 1)
455         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
456         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
457         l.delete(ldb.Dn(l, "dc=foo8"))
458
459     def test_empty_dn(self):
460         l = ldb.Ldb(self.url(), flags=self.flags())
461         self.assertEqual(0, len(l.search()))
462         m = ldb.Message()
463         m.dn = ldb.Dn(l, "dc=empty")
464         m["objectUUID"] = b"0123456789abcdef"
465         l.add(m)
466         rm = l.search()
467         self.assertEqual(1, len(rm))
468         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
469                          set(rm[0].keys()))
470
471         rm = l.search(m.dn)
472         self.assertEqual(1, len(rm))
473         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
474                          set(rm[0].keys()))
475         rm = l.search(m.dn, attrs=["blah"])
476         self.assertEqual(1, len(rm))
477         self.assertEqual(0, len(rm[0]))
478
479     def test_modify_delete(self):
480         l = ldb.Ldb(self.url(), flags=self.flags())
481         m = ldb.Message()
482         m.dn = ldb.Dn(l, "dc=modifydelete")
483         m["bla"] = [b"1234"]
484         m["objectUUID"] = b"0123456789abcdef"
485         l.add(m)
486         rm = l.search(m.dn)[0]
487         self.assertEqual([b"1234"], list(rm["bla"]))
488         try:
489             m = ldb.Message()
490             m.dn = ldb.Dn(l, "dc=modifydelete")
491             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
492             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
493             l.modify(m)
494             rm = l.search(m.dn)
495             self.assertEqual(1, len(rm))
496             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
497                              set(rm[0].keys()))
498             rm = l.search(m.dn, attrs=["bla"])
499             self.assertEqual(1, len(rm))
500             self.assertEqual(0, len(rm[0]))
501         finally:
502             l.delete(ldb.Dn(l, "dc=modifydelete"))
503
504     def test_modify_delete_text(self):
505         l = ldb.Ldb(self.url(), flags=self.flags())
506         m = ldb.Message()
507         m.dn = ldb.Dn(l, "dc=modifydelete")
508         m.text["bla"] = ["1234"]
509         m["objectUUID"] = b"0123456789abcdef"
510         l.add(m)
511         rm = l.search(m.dn)[0]
512         self.assertEqual(["1234"], list(rm.text["bla"]))
513         try:
514             m = ldb.Message()
515             m.dn = ldb.Dn(l, "dc=modifydelete")
516             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
517             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
518             l.modify(m)
519             rm = l.search(m.dn)
520             self.assertEqual(1, len(rm))
521             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
522                              set(rm[0].keys()))
523             rm = l.search(m.dn, attrs=["bla"])
524             self.assertEqual(1, len(rm))
525             self.assertEqual(0, len(rm[0]))
526         finally:
527             l.delete(ldb.Dn(l, "dc=modifydelete"))
528
529     def test_modify_add(self):
530         l = ldb.Ldb(self.url(), flags=self.flags())
531         m = ldb.Message()
532         m.dn = ldb.Dn(l, "dc=add")
533         m["bla"] = [b"1234"]
534         m["objectUUID"] = b"0123456789abcdef"
535         l.add(m)
536         try:
537             m = ldb.Message()
538             m.dn = ldb.Dn(l, "dc=add")
539             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
540             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
541             l.modify(m)
542             rm = l.search(m.dn)[0]
543             self.assertEqual(3, len(rm))
544             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
545         finally:
546             l.delete(ldb.Dn(l, "dc=add"))
547
548     def test_modify_add_text(self):
549         l = ldb.Ldb(self.url(), flags=self.flags())
550         m = ldb.Message()
551         m.dn = ldb.Dn(l, "dc=add")
552         m.text["bla"] = ["1234"]
553         m["objectUUID"] = b"0123456789abcdef"
554         l.add(m)
555         try:
556             m = ldb.Message()
557             m.dn = ldb.Dn(l, "dc=add")
558             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
559             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
560             l.modify(m)
561             rm = l.search(m.dn)[0]
562             self.assertEqual(3, len(rm))
563             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
564         finally:
565             l.delete(ldb.Dn(l, "dc=add"))
566
567     def test_modify_replace(self):
568         l = ldb.Ldb(self.url(), flags=self.flags())
569         m = ldb.Message()
570         m.dn = ldb.Dn(l, "dc=modify2")
571         m["bla"] = [b"1234", b"456"]
572         m["objectUUID"] = b"0123456789abcdef"
573         l.add(m)
574         try:
575             m = ldb.Message()
576             m.dn = ldb.Dn(l, "dc=modify2")
577             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
578             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
579             l.modify(m)
580             rm = l.search(m.dn)[0]
581             self.assertEqual(3, len(rm))
582             self.assertEqual([b"789"], list(rm["bla"]))
583             rm = l.search(m.dn, attrs=["bla"])[0]
584             self.assertEqual(1, len(rm))
585         finally:
586             l.delete(ldb.Dn(l, "dc=modify2"))
587
588     def test_modify_replace_text(self):
589         l = ldb.Ldb(self.url(), flags=self.flags())
590         m = ldb.Message()
591         m.dn = ldb.Dn(l, "dc=modify2")
592         m.text["bla"] = ["1234", "456"]
593         m["objectUUID"] = b"0123456789abcdef"
594         l.add(m)
595         try:
596             m = ldb.Message()
597             m.dn = ldb.Dn(l, "dc=modify2")
598             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
599             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
600             l.modify(m)
601             rm = l.search(m.dn)[0]
602             self.assertEqual(3, len(rm))
603             self.assertEqual(["789"], list(rm.text["bla"]))
604             rm = l.search(m.dn, attrs=["bla"])[0]
605             self.assertEqual(1, len(rm))
606         finally:
607             l.delete(ldb.Dn(l, "dc=modify2"))
608
609     def test_modify_flags_change(self):
610         l = ldb.Ldb(self.url(), flags=self.flags())
611         m = ldb.Message()
612         m.dn = ldb.Dn(l, "dc=add")
613         m["bla"] = [b"1234"]
614         m["objectUUID"] = b"0123456789abcdef"
615         l.add(m)
616         try:
617             m = ldb.Message()
618             m.dn = ldb.Dn(l, "dc=add")
619             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
620             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
621             l.modify(m)
622             rm = l.search(m.dn)[0]
623             self.assertEqual(3, len(rm))
624             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
625
626             # Now create another modify, but switch the flags before we do it
627             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
628             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
629             l.modify(m)
630             rm = l.search(m.dn, attrs=["bla"])[0]
631             self.assertEqual(1, len(rm))
632             self.assertEqual([b"1234"], list(rm["bla"]))
633         finally:
634             l.delete(ldb.Dn(l, "dc=add"))
635
636     def test_modify_flags_change_text(self):
637         l = ldb.Ldb(self.url(), flags=self.flags())
638         m = ldb.Message()
639         m.dn = ldb.Dn(l, "dc=add")
640         m.text["bla"] = ["1234"]
641         m["objectUUID"] = b"0123456789abcdef"
642         l.add(m)
643         try:
644             m = ldb.Message()
645             m.dn = ldb.Dn(l, "dc=add")
646             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
647             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
648             l.modify(m)
649             rm = l.search(m.dn)[0]
650             self.assertEqual(3, len(rm))
651             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
652
653             # Now create another modify, but switch the flags before we do it
654             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
655             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
656             l.modify(m)
657             rm = l.search(m.dn, attrs=["bla"])[0]
658             self.assertEqual(1, len(rm))
659             self.assertEqual(["1234"], list(rm.text["bla"]))
660         finally:
661             l.delete(ldb.Dn(l, "dc=add"))
662
663     def test_transaction_commit(self):
664         l = ldb.Ldb(self.url(), flags=self.flags())
665         l.transaction_start()
666         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
667         m["foo"] = [b"bar"]
668         m["objectUUID"] = b"0123456789abcdef"
669         l.add(m)
670         l.transaction_commit()
671         l.delete(m.dn)
672
673     def test_transaction_cancel(self):
674         l = ldb.Ldb(self.url(), flags=self.flags())
675         l.transaction_start()
676         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
677         m["foo"] = [b"bar"]
678         m["objectUUID"] = b"0123456789abcdee"
679         l.add(m)
680         l.transaction_cancel()
681         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
682
683     def test_set_debug(self):
684         def my_report_fn(level, text):
685             pass
686         l = ldb.Ldb(self.url(), flags=self.flags())
687         l.set_debug(my_report_fn)
688
689     def test_zero_byte_string(self):
690         """Testing we do not get trapped in the \0 byte in a property string."""
691         l = ldb.Ldb(self.url(), flags=self.flags())
692         l.add({
693             "dn": b"dc=somedn",
694             "objectclass": b"user",
695             "cN": b"LDAPtestUSER",
696             "givenname": b"ldap",
697             "displayname": b"foo\0bar",
698             "objectUUID": b"0123456789abcdef"
699         })
700         res = l.search(expression="(dn=dc=somedn)")
701         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
702
703     def test_no_crash_broken_expr(self):
704         l = ldb.Ldb(self.url(), flags=self.flags())
705         self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
706
707 # Run the SimpleLdb tests against an lmdb backend
708
709
710 class SimpleLdbLmdb(SimpleLdb):
711
712     def setUp(self):
713         if os.environ.get('HAVE_LMDB', '1') == '0':
714             self.skipTest("No lmdb backend")
715         self.prefix = MDB_PREFIX
716         self.index = MDB_INDEX_OBJ
717         super(SimpleLdbLmdb, self).setUp()
718
719     def tearDown(self):
720         super(SimpleLdbLmdb, self).tearDown()
721
722
723 class SimpleLdbNoLmdb(LdbBaseTest):
724
725     def setUp(self):
726         if os.environ.get('HAVE_LMDB', '1') != '0':
727             self.skipTest("lmdb backend enabled")
728         self.prefix = MDB_PREFIX
729         self.index = MDB_INDEX_OBJ
730         super(SimpleLdbNoLmdb, self).setUp()
731
732     def tearDown(self):
733         super(SimpleLdbNoLmdb, self).tearDown()
734
735     def test_lmdb_disabled(self):
736         self.testdir = tempdir()
737         self.filename = os.path.join(self.testdir, "test.ldb")
738         try:
739             self.ldb = ldb.Ldb(self.url(), flags=self.flags())
740             self.fail("Should have failed on missing LMDB")
741         except ldb.LdbError as err:
742             enum = err.args[0]
743             self.assertEqual(enum, ldb.ERR_OTHER)
744
745
746 class SearchTests(LdbBaseTest):
747     def tearDown(self):
748         shutil.rmtree(self.testdir)
749         super(SearchTests, self).tearDown()
750
751         # Ensure the LDB is closed now, so we close the FD
752         del(self.l)
753
754     def setUp(self):
755         super(SearchTests, self).setUp()
756         self.testdir = tempdir()
757         self.filename = os.path.join(self.testdir, "search_test.ldb")
758         options = ["modules:rdn_name"]
759         if hasattr(self, 'IDXCHECK'):
760             options.append("disable_full_db_scan_for_self_test:1")
761         self.l = ldb.Ldb(self.url(),
762                          flags=self.flags(),
763                          options=options)
764         try:
765             self.l.add(self.index)
766         except AttributeError:
767             pass
768
769         self.l.add({"dn": "@ATTRIBUTES",
770                     "DC": "CASE_INSENSITIVE"})
771
772         # Note that we can't use the name objectGUID here, as we
773         # want to stay clear of the objectGUID handler in LDB and
774         # instead use just the 16 bytes raw, which we just keep
775         # to printable chars here for ease of handling.
776
777         self.l.add({"dn": "DC=SAMBA,DC=ORG",
778                     "name": b"samba.org",
779                     "objectUUID": b"0123456789abcdef"})
780         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
781                     "name": b"Admins",
782                     "x": "z", "y": "a",
783                     "objectUUID": b"0123456789abcde1"})
784         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
785                     "name": b"Users",
786                     "x": "z", "y": "a",
787                     "objectUUID": b"0123456789abcde2"})
788         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
789                     "name": b"OU #1",
790                     "x": "y", "y": "a",
791                     "objectUUID": b"0123456789abcde3"})
792         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
793                     "name": b"OU #2",
794                     "x": "y", "y": "a",
795                     "objectUUID": b"0123456789abcde4"})
796         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
797                     "name": b"OU #3",
798                     "x": "y", "y": "a",
799                     "objectUUID": b"0123456789abcde5"})
800         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
801                     "name": b"OU #4",
802                     "x": "y", "y": "a",
803                     "objectUUID": b"0123456789abcde6"})
804         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
805                     "name": b"OU #5",
806                     "x": "y", "y": "a",
807                     "objectUUID": b"0123456789abcde7"})
808         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
809                     "name": b"OU #6",
810                     "x": "y", "y": "a",
811                     "objectUUID": b"0123456789abcde8"})
812         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
813                     "name": b"OU #7",
814                     "x": "y", "y": "a",
815                     "objectUUID": b"0123456789abcde9"})
816         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
817                     "name": b"OU #8",
818                     "x": "y", "y": "a",
819                     "objectUUID": b"0123456789abcde0"})
820         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
821                     "name": b"OU #9",
822                     "x": "y", "y": "a",
823                     "objectUUID": b"0123456789abcdea"})
824         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
825                     "name": b"OU #10",
826                     "x": "y", "y": "a",
827                     "objectUUID": b"0123456789abcdeb"})
828         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
829                     "name": b"OU #10",
830                     "x": "y", "y": "a",
831                     "objectUUID": b"0123456789abcdec"})
832         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
833                     "name": b"OU #10",
834                     "x": "y", "y": "b",
835                     "objectUUID": b"0123456789abcded"})
836         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
837                     "name": b"OU #10",
838                     "x": "x", "y": "b",
839                     "objectUUID": b"0123456789abcdee"})
840         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
841                     "name": b"OU #10",
842                     "x": "x", "y": "b",
843                     "objectUUID": b"0123456789abcd01"})
844         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
845                     "name": b"OU #10",
846                     "x": "x", "y": "b",
847                     "objectUUID": b"0123456789abcd02"})
848         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
849                     "name": b"OU #10",
850                     "x": "x", "y": "b",
851                     "objectUUID": b"0123456789abcd03"})
852         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
853                     "name": b"OU #10",
854                     "x": "x", "y": "b",
855                     "objectUUID": b"0123456789abcd04"})
856         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
857                     "name": b"OU #10",
858                     "x": "x", "y": "b",
859                     "objectUUID": b"0123456789abcd05"})
860         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
861                     "name": b"OU #10",
862                     "x": "x", "y": "b",
863                     "objectUUID": b"0123456789abcd06"})
864         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
865                     "name": b"OU #10",
866                     "x": "x", "y": "b",
867                     "objectUUID": b"0123456789abcd07"})
868         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
869                     "name": b"OU #10",
870                     "x": "x", "y": "c",
871                     "objectUUID": b"0123456789abcd08"})
872         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
873                     "name": b"OU #10",
874                     "x": "x", "y": "c",
875                     "objectUUID": b"0123456789abcd09"})
876
877     def test_base(self):
878         """Testing a search"""
879
880         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
881                               scope=ldb.SCOPE_BASE)
882         self.assertEqual(len(res11), 1)
883
884     def test_base_lower(self):
885         """Testing a search"""
886
887         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
888                               scope=ldb.SCOPE_BASE)
889         self.assertEqual(len(res11), 1)
890
891     def test_base_or(self):
892         """Testing a search"""
893
894         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
895                               scope=ldb.SCOPE_BASE,
896                               expression="(|(ou=ou11)(ou=ou12))")
897         self.assertEqual(len(res11), 1)
898
899     def test_base_or2(self):
900         """Testing a search"""
901
902         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
903                               scope=ldb.SCOPE_BASE,
904                               expression="(|(x=y)(y=b))")
905         self.assertEqual(len(res11), 1)
906
907     def test_base_and(self):
908         """Testing a search"""
909
910         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
911                               scope=ldb.SCOPE_BASE,
912                               expression="(&(ou=ou11)(ou=ou12))")
913         self.assertEqual(len(res11), 0)
914
915     def test_base_and2(self):
916         """Testing a search"""
917
918         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
919                               scope=ldb.SCOPE_BASE,
920                               expression="(&(x=y)(y=a))")
921         self.assertEqual(len(res11), 1)
922
923     def test_base_false(self):
924         """Testing a search"""
925
926         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
927                               scope=ldb.SCOPE_BASE,
928                               expression="(|(ou=ou13)(ou=ou12))")
929         self.assertEqual(len(res11), 0)
930
931     def test_check_base_false(self):
932         """Testing a search"""
933         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
934                               scope=ldb.SCOPE_BASE,
935                               expression="(|(ou=ou13)(ou=ou12))")
936         self.assertEqual(len(res11), 0)
937
938     def test_check_base_error(self):
939         """Testing a search"""
940         checkbaseonsearch = {"dn": "@OPTIONS",
941                              "checkBaseOnSearch": b"TRUE"}
942         try:
943             self.l.add(checkbaseonsearch)
944         except ldb.LdbError as err:
945             enum = err.args[0]
946             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
947             m = ldb.Message.from_dict(self.l,
948                                       checkbaseonsearch)
949             self.l.modify(m)
950
951         try:
952             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
953                                   scope=ldb.SCOPE_BASE,
954                                   expression="(|(ou=ou13)(ou=ou12))")
955             self.fail("Should have failed on missing base")
956         except ldb.LdbError as err:
957             enum = err.args[0]
958             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
959
960     def test_subtree_and(self):
961         """Testing a search"""
962
963         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
964                               scope=ldb.SCOPE_SUBTREE,
965                               expression="(&(ou=ou11)(ou=ou12))")
966         self.assertEqual(len(res11), 0)
967
968     def test_subtree_and2(self):
969         """Testing a search"""
970
971         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
972                               scope=ldb.SCOPE_SUBTREE,
973                               expression="(&(x=y)(|(y=b)(y=c)))")
974         self.assertEqual(len(res11), 1)
975
976     def test_subtree_and2_lower(self):
977         """Testing a search"""
978
979         res11 = self.l.search(base="DC=samba,DC=org",
980                               scope=ldb.SCOPE_SUBTREE,
981                               expression="(&(x=y)(|(y=b)(y=c)))")
982         self.assertEqual(len(res11), 1)
983
984     def test_subtree_or(self):
985         """Testing a search"""
986
987         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
988                               scope=ldb.SCOPE_SUBTREE,
989                               expression="(|(ou=ou11)(ou=ou12))")
990         self.assertEqual(len(res11), 2)
991
992     def test_subtree_or2(self):
993         """Testing a search"""
994
995         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
996                               scope=ldb.SCOPE_SUBTREE,
997                               expression="(|(x=y)(y=b))")
998         self.assertEqual(len(res11), 20)
999
1000     def test_subtree_or3(self):
1001         """Testing a search"""
1002
1003         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1004                               scope=ldb.SCOPE_SUBTREE,
1005                               expression="(|(x=y)(y=b)(y=c))")
1006         self.assertEqual(len(res11), 22)
1007
1008     def test_one_and(self):
1009         """Testing a search"""
1010
1011         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1012                               scope=ldb.SCOPE_ONELEVEL,
1013                               expression="(&(ou=ou11)(ou=ou12))")
1014         self.assertEqual(len(res11), 0)
1015
1016     def test_one_and2(self):
1017         """Testing a search"""
1018
1019         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1020                               scope=ldb.SCOPE_ONELEVEL,
1021                               expression="(&(x=y)(y=b))")
1022         self.assertEqual(len(res11), 1)
1023
1024     def test_one_or(self):
1025         """Testing a search"""
1026
1027         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1028                               scope=ldb.SCOPE_ONELEVEL,
1029                               expression="(|(ou=ou11)(ou=ou12))")
1030         self.assertEqual(len(res11), 2)
1031
1032     def test_one_or2(self):
1033         """Testing a search"""
1034
1035         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1036                               scope=ldb.SCOPE_ONELEVEL,
1037                               expression="(|(x=y)(y=b))")
1038         self.assertEqual(len(res11), 20)
1039
1040     def test_one_or2_lower(self):
1041         """Testing a search"""
1042
1043         res11 = self.l.search(base="DC=samba,DC=org",
1044                               scope=ldb.SCOPE_ONELEVEL,
1045                               expression="(|(x=y)(y=b))")
1046         self.assertEqual(len(res11), 20)
1047
1048     def test_one_unindexable(self):
1049         """Testing a search"""
1050
1051         try:
1052             res11 = self.l.search(base="DC=samba,DC=org",
1053                                   scope=ldb.SCOPE_ONELEVEL,
1054                                   expression="(y=b*)")
1055             if hasattr(self, 'IDX') and \
1056                not hasattr(self, 'IDXONE') and \
1057                hasattr(self, 'IDXCHECK'):
1058                 self.fail("Should have failed as un-indexed search")
1059
1060             self.assertEqual(len(res11), 9)
1061
1062         except ldb.LdbError as err:
1063             enum = err.args[0]
1064             estr = err.args[1]
1065             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1066             self.assertIn(estr, "ldb FULL SEARCH disabled")
1067
1068     def test_one_unindexable_presence(self):
1069         """Testing a search"""
1070
1071         try:
1072             res11 = self.l.search(base="DC=samba,DC=org",
1073                                   scope=ldb.SCOPE_ONELEVEL,
1074                                   expression="(y=*)")
1075             if hasattr(self, 'IDX') and \
1076                not hasattr(self, 'IDXONE') and \
1077                hasattr(self, 'IDXCHECK'):
1078                 self.fail("Should have failed as un-indexed search")
1079
1080             self.assertEqual(len(res11), 24)
1081
1082         except ldb.LdbError as err:
1083             enum = err.args[0]
1084             estr = err.args[1]
1085             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1086             self.assertIn(estr, "ldb FULL SEARCH disabled")
1087
1088     def test_subtree_and_or(self):
1089         """Testing a search"""
1090
1091         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1092                               scope=ldb.SCOPE_SUBTREE,
1093                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1094         self.assertEqual(len(res11), 0)
1095
1096     def test_subtree_and_or2(self):
1097         """Testing a search"""
1098
1099         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1100                               scope=ldb.SCOPE_SUBTREE,
1101                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1102         self.assertEqual(len(res11), 0)
1103
1104     def test_subtree_and_or3(self):
1105         """Testing a search"""
1106
1107         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1108                               scope=ldb.SCOPE_SUBTREE,
1109                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1110         self.assertEqual(len(res11), 2)
1111
1112     def test_subtree_and_or4(self):
1113         """Testing a search"""
1114
1115         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1116                               scope=ldb.SCOPE_SUBTREE,
1117                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1118         self.assertEqual(len(res11), 2)
1119
1120     def test_subtree_and_or5(self):
1121         """Testing a search"""
1122
1123         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1124                               scope=ldb.SCOPE_SUBTREE,
1125                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1126         self.assertEqual(len(res11), 1)
1127
1128     def test_subtree_or_and(self):
1129         """Testing a search"""
1130
1131         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1132                               scope=ldb.SCOPE_SUBTREE,
1133                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1134         self.assertEqual(len(res11), 10)
1135
1136     def test_subtree_large_and_unique(self):
1137         """Testing a search"""
1138
1139         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1140                               scope=ldb.SCOPE_SUBTREE,
1141                               expression="(&(ou=ou10)(y=a))")
1142         self.assertEqual(len(res11), 1)
1143
1144     def test_subtree_and_none(self):
1145         """Testing a search"""
1146
1147         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1148                               scope=ldb.SCOPE_SUBTREE,
1149                               expression="(&(ou=ouX)(y=a))")
1150         self.assertEqual(len(res11), 0)
1151
1152     def test_subtree_and_idx_record(self):
1153         """Testing a search against the index record"""
1154
1155         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1156                               scope=ldb.SCOPE_SUBTREE,
1157                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1158         self.assertEqual(len(res11), 0)
1159
1160     def test_subtree_and_idxone_record(self):
1161         """Testing a search against the index record"""
1162
1163         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1164                               scope=ldb.SCOPE_SUBTREE,
1165                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1166         self.assertEqual(len(res11), 0)
1167
1168     def test_subtree_unindexable(self):
1169         """Testing a search"""
1170
1171         try:
1172             res11 = self.l.search(base="DC=samba,DC=org",
1173                                   scope=ldb.SCOPE_SUBTREE,
1174                                   expression="(y=b*)")
1175             if hasattr(self, 'IDX') and \
1176                hasattr(self, 'IDXCHECK'):
1177                 self.fail("Should have failed as un-indexed search")
1178
1179             self.assertEqual(len(res11), 9)
1180
1181         except ldb.LdbError as err:
1182             enum = err.args[0]
1183             estr = err.args[1]
1184             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1185             self.assertIn(estr, "ldb FULL SEARCH disabled")
1186
1187     def test_subtree_unindexable_presence(self):
1188         """Testing a search"""
1189
1190         try:
1191             res11 = self.l.search(base="DC=samba,DC=org",
1192                                   scope=ldb.SCOPE_SUBTREE,
1193                                   expression="(y=*)")
1194             if hasattr(self, 'IDX') and \
1195                hasattr(self, 'IDXCHECK'):
1196                 self.fail("Should have failed as un-indexed search")
1197
1198             self.assertEqual(len(res11), 24)
1199
1200         except ldb.LdbError as err:
1201             enum = err.args[0]
1202             estr = err.args[1]
1203             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1204             self.assertIn(estr, "ldb FULL SEARCH disabled")
1205
1206     def test_dn_filter_one(self):
1207         """Testing that a dn= filter succeeds
1208         (or fails with disallowDNFilter
1209         set and IDXGUID or (IDX and not IDXONE) mode)
1210         when the scope is SCOPE_ONELEVEL.
1211
1212         This should be made more consistent, but for now lock in
1213         the behaviour
1214
1215         """
1216
1217         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1218                               scope=ldb.SCOPE_ONELEVEL,
1219                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1220         if hasattr(self, 'disallowDNFilter') and \
1221            hasattr(self, 'IDX') and \
1222            (hasattr(self, 'IDXGUID') or
1223             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1224             self.assertEqual(len(res11), 0)
1225         else:
1226             self.assertEqual(len(res11), 1)
1227
1228     def test_dn_filter_subtree(self):
1229         """Testing that a dn= filter succeeds
1230         (or fails with disallowDNFilter set)
1231         when the scope is SCOPE_SUBTREE"""
1232
1233         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1234                               scope=ldb.SCOPE_SUBTREE,
1235                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1236         if hasattr(self, 'disallowDNFilter') \
1237            and hasattr(self, 'IDX'):
1238             self.assertEqual(len(res11), 0)
1239         else:
1240             self.assertEqual(len(res11), 1)
1241
1242     def test_dn_filter_base(self):
1243         """Testing that (incorrectly) a dn= filter works
1244         when the scope is SCOPE_BASE"""
1245
1246         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1247                               scope=ldb.SCOPE_BASE,
1248                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1249
1250         # At some point we should fix this, but it isn't trivial
1251         self.assertEqual(len(res11), 1)
1252
1253     def test_distinguishedName_filter_one(self):
1254         """Testing that a distinguishedName= filter succeeds
1255         when the scope is SCOPE_ONELEVEL.
1256
1257         This should be made more consistent, but for now lock in
1258         the behaviour
1259
1260         """
1261
1262         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1263                               scope=ldb.SCOPE_ONELEVEL,
1264                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1265         self.assertEqual(len(res11), 1)
1266
1267     def test_distinguishedName_filter_subtree(self):
1268         """Testing that a distinguishedName= filter succeeds
1269         when the scope is SCOPE_SUBTREE"""
1270
1271         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1272                               scope=ldb.SCOPE_SUBTREE,
1273                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1274         self.assertEqual(len(res11), 1)
1275
1276     def test_distinguishedName_filter_base(self):
1277         """Testing that (incorrectly) a distinguishedName= filter works
1278         when the scope is SCOPE_BASE"""
1279
1280         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1281                               scope=ldb.SCOPE_BASE,
1282                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1283
1284         # At some point we should fix this, but it isn't trivial
1285         self.assertEqual(len(res11), 1)
1286
1287     def test_bad_dn_filter_base(self):
1288         """Testing that a dn= filter on an invalid DN works
1289         when the scope is SCOPE_BASE but
1290         returns zero results"""
1291
1292         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1293                               scope=ldb.SCOPE_BASE,
1294                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1295
1296         # At some point we should fix this, but it isn't trivial
1297         self.assertEqual(len(res11), 0)
1298
1299
1300     def test_bad_dn_filter_one(self):
1301         """Testing that a dn= filter succeeds but returns zero
1302         results when the DN is not valid on a SCOPE_ONELEVEL search
1303
1304         """
1305
1306         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1307                               scope=ldb.SCOPE_ONELEVEL,
1308                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1309         self.assertEqual(len(res11), 0)
1310
1311     def test_bad_dn_filter_subtree(self):
1312         """Testing that a dn= filter succeeds but returns zero
1313         results when the DN is not valid on a SCOPE_SUBTREE search
1314
1315         """
1316
1317         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1318                               scope=ldb.SCOPE_SUBTREE,
1319                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1320         self.assertEqual(len(res11), 0)
1321
1322     def test_bad_distinguishedName_filter_base(self):
1323         """Testing that a distinguishedName= filter on an invalid DN works
1324         when the scope is SCOPE_BASE but
1325         returns zero results"""
1326
1327         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1328                               scope=ldb.SCOPE_BASE,
1329                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1330
1331         # At some point we should fix this, but it isn't trivial
1332         self.assertEqual(len(res11), 0)
1333
1334
1335     def test_bad_distinguishedName_filter_one(self):
1336         """Testing that a distinguishedName= filter succeeds but returns zero
1337         results when the DN is not valid on a SCOPE_ONELEVEL search
1338
1339         """
1340
1341         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1342                               scope=ldb.SCOPE_ONELEVEL,
1343                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1344         self.assertEqual(len(res11), 0)
1345
1346     def test_bad_distinguishedName_filter_subtree(self):
1347         """Testing that a distinguishedName= filter succeeds but returns zero
1348         results when the DN is not valid on a SCOPE_SUBTREE search
1349
1350         """
1351
1352         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1353                               scope=ldb.SCOPE_SUBTREE,
1354                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1355         self.assertEqual(len(res11), 0)
1356
1357     def test_bad_dn_search_base(self):
1358         """Testing with a bad base DN (SCOPE_BASE)"""
1359
1360         try:
1361             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1362                                   scope=ldb.SCOPE_BASE)
1363             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1364         except ldb.LdbError as err:
1365             enum = err.args[0]
1366             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1367
1368
1369     def test_bad_dn_search_one(self):
1370         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1371
1372         try:
1373             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1374                                   scope=ldb.SCOPE_ONELEVEL)
1375             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1376         except ldb.LdbError as err:
1377             enum = err.args[0]
1378             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1379
1380     def test_bad_dn_search_subtree(self):
1381         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1382
1383         try:
1384             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1385                                   scope=ldb.SCOPE_SUBTREE)
1386             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1387         except ldb.LdbError as err:
1388             enum = err.args[0]
1389             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1390
1391
1392
1393 # Run the search tests against an lmdb backend
1394 class SearchTestsLmdb(SearchTests):
1395
1396     def setUp(self):
1397         if os.environ.get('HAVE_LMDB', '1') == '0':
1398             self.skipTest("No lmdb backend")
1399         self.prefix = MDB_PREFIX
1400         self.index = MDB_INDEX_OBJ
1401         super(SearchTestsLmdb, self).setUp()
1402
1403     def tearDown(self):
1404         super(SearchTestsLmdb, self).tearDown()
1405
1406
1407 class IndexedSearchTests(SearchTests):
1408     """Test searches using the index, to ensure the index doesn't
1409        break things"""
1410
1411     def setUp(self):
1412         super(IndexedSearchTests, self).setUp()
1413         self.l.add({"dn": "@INDEXLIST",
1414                     "@IDXATTR": [b"x", b"y", b"ou"]})
1415         self.IDX = True
1416
1417
1418 class IndexedCheckSearchTests(IndexedSearchTests):
1419     """Test searches using the index, to ensure the index doesn't
1420        break things (full scan disabled)"""
1421
1422     def setUp(self):
1423         self.IDXCHECK = True
1424         super(IndexedCheckSearchTests, self).setUp()
1425
1426
1427 class IndexedSearchDnFilterTests(SearchTests):
1428     """Test searches using the index, to ensure the index doesn't
1429        break things"""
1430
1431     def setUp(self):
1432         super(IndexedSearchDnFilterTests, self).setUp()
1433         self.l.add({"dn": "@OPTIONS",
1434                     "disallowDNFilter": "TRUE"})
1435         self.disallowDNFilter = True
1436
1437         self.l.add({"dn": "@INDEXLIST",
1438                     "@IDXATTR": [b"x", b"y", b"ou"]})
1439         self.IDX = True
1440
1441
1442 class IndexedAndOneLevelSearchTests(SearchTests):
1443     """Test searches using the index including @IDXONE, to ensure
1444        the index doesn't break things"""
1445
1446     def setUp(self):
1447         super(IndexedAndOneLevelSearchTests, self).setUp()
1448         self.l.add({"dn": "@INDEXLIST",
1449                     "@IDXATTR": [b"x", b"y", b"ou"],
1450                     "@IDXONE": [b"1"]})
1451         self.IDX = True
1452         self.IDXONE = True
1453
1454
1455 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
1456     """Test searches using the index including @IDXONE, to ensure
1457        the index doesn't break things (full scan disabled)"""
1458
1459     def setUp(self):
1460         self.IDXCHECK = True
1461         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
1462
1463
1464 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1465     """Test searches using the index including @IDXONE, to ensure
1466        the index doesn't break things"""
1467
1468     def setUp(self):
1469         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1470         self.l.add({"dn": "@OPTIONS",
1471                     "disallowDNFilter": "TRUE",
1472                     "checkBaseOnSearch": "TRUE"})
1473         self.disallowDNFilter = True
1474         self.checkBaseOnSearch = True
1475
1476         self.l.add({"dn": "@INDEXLIST",
1477                     "@IDXATTR": [b"x", b"y", b"ou"],
1478                     "@IDXONE": [b"1"]})
1479         self.IDX = True
1480         self.IDXONE = True
1481
1482
1483 class GUIDIndexedSearchTests(SearchTests):
1484     """Test searches using the index, to ensure the index doesn't
1485        break things"""
1486
1487     def setUp(self):
1488         self.index = {"dn": "@INDEXLIST",
1489                       "@IDXATTR": [b"x", b"y", b"ou"],
1490                       "@IDXGUID": [b"objectUUID"],
1491                       "@IDX_DN_GUID": [b"GUID"]}
1492         super(GUIDIndexedSearchTests, self).setUp()
1493
1494         self.IDXGUID = True
1495         self.IDXONE = True
1496
1497
1498 class GUIDIndexedDNFilterSearchTests(SearchTests):
1499     """Test searches using the index, to ensure the index doesn't
1500        break things"""
1501
1502     def setUp(self):
1503         self.index = {"dn": "@INDEXLIST",
1504                       "@IDXATTR": [b"x", b"y", b"ou"],
1505                       "@IDXGUID": [b"objectUUID"],
1506                       "@IDX_DN_GUID": [b"GUID"]}
1507         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1508         self.l.add({"dn": "@OPTIONS",
1509                     "disallowDNFilter": "TRUE",
1510                     "checkBaseOnSearch": "TRUE"})
1511         self.disallowDNFilter = True
1512         self.checkBaseOnSearch = True
1513         self.IDX = True
1514         self.IDXGUID = True
1515
1516
1517 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1518     """Test searches using the index including @IDXONE, to ensure
1519        the index doesn't break things"""
1520
1521     def setUp(self):
1522         self.index = {"dn": "@INDEXLIST",
1523                       "@IDXATTR": [b"x", b"y", b"ou"],
1524                       "@IDXGUID": [b"objectUUID"],
1525                       "@IDX_DN_GUID": [b"GUID"]}
1526         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1527         self.l.add({"dn": "@OPTIONS",
1528                     "disallowDNFilter": "TRUE",
1529                     "checkBaseOnSearch": "TRUE"})
1530         self.disallowDNFilter = True
1531         self.checkBaseOnSearch = True
1532         self.IDX = True
1533         self.IDXGUID = True
1534         self.IDXONE = True
1535
1536
1537 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1538
1539     def setUp(self):
1540         if os.environ.get('HAVE_LMDB', '1') == '0':
1541             self.skipTest("No lmdb backend")
1542         self.prefix = MDB_PREFIX
1543         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1544
1545     def tearDown(self):
1546         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1547
1548
1549 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1550
1551     def setUp(self):
1552         if os.environ.get('HAVE_LMDB', '1') == '0':
1553             self.skipTest("No lmdb backend")
1554         self.prefix = MDB_PREFIX
1555         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1556
1557     def tearDown(self):
1558         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1559
1560
1561 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1562
1563     def setUp(self):
1564         if os.environ.get('HAVE_LMDB', '1') == '0':
1565             self.skipTest("No lmdb backend")
1566         self.prefix = MDB_PREFIX
1567         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1568
1569     def tearDown(self):
1570         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1571
1572
1573 class AddModifyTests(LdbBaseTest):
1574     def tearDown(self):
1575         shutil.rmtree(self.testdir)
1576         super(AddModifyTests, self).tearDown()
1577
1578         # Ensure the LDB is closed now, so we close the FD
1579         del(self.l)
1580
1581     def setUp(self):
1582         super(AddModifyTests, self).setUp()
1583         self.testdir = tempdir()
1584         self.filename = os.path.join(self.testdir, "add_test.ldb")
1585         self.l = ldb.Ldb(self.url(),
1586                          flags=self.flags(),
1587                          options=["modules:rdn_name"])
1588         try:
1589             self.l.add(self.index)
1590         except AttributeError:
1591             pass
1592
1593         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1594                     "name": b"samba.org",
1595                     "objectUUID": b"0123456789abcdef"})
1596         self.l.add({"dn": "@ATTRIBUTES",
1597                     "objectUUID": "UNIQUE_INDEX"})
1598
1599     def test_add_dup(self):
1600         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1601                     "name": b"Admins",
1602                     "x": "z", "y": "a",
1603                     "objectUUID": b"0123456789abcde1"})
1604         try:
1605             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1606                         "name": b"Admins",
1607                         "x": "z", "y": "a",
1608                         "objectUUID": b"0123456789abcde2"})
1609             self.fail("Should have failed adding dupliate entry")
1610         except ldb.LdbError as err:
1611             enum = err.args[0]
1612             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1613
1614     def test_add_bad(self):
1615         try:
1616             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
1617                         "name": b"Admins",
1618                         "x": "z", "y": "a",
1619                         "objectUUID": b"0123456789abcde1"})
1620             self.fail("Should have failed adding entry with invalid DN")
1621         except ldb.LdbError as err:
1622             enum = err.args[0]
1623             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1624
1625     def test_add_del_add(self):
1626         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1627                     "name": b"Admins",
1628                     "x": "z", "y": "a",
1629                     "objectUUID": b"0123456789abcde1"})
1630         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1631         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1632                     "name": b"Admins",
1633                     "x": "z", "y": "a",
1634                     "objectUUID": b"0123456789abcde2"})
1635
1636     def test_add_move_add(self):
1637         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1638                     "name": b"Admins",
1639                     "x": "z", "y": "a",
1640                     "objectUUID": b"0123456789abcde1"})
1641         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1642                       "OU=DUP2,DC=SAMBA,DC=ORG")
1643         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1644                     "name": b"Admins",
1645                     "x": "z", "y": "a",
1646                     "objectUUID": b"0123456789abcde2"})
1647
1648     def test_add_move_fail_move_move(self):
1649         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1650                     "name": b"Admins",
1651                     "x": "z", "y": "a",
1652                     "objectUUID": b"0123456789abcde1"})
1653         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1654                     "name": b"Admins",
1655                     "x": "z", "y": "a",
1656                     "objectUUID": b"0123456789abcde2"})
1657
1658         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1659                              scope=ldb.SCOPE_SUBTREE,
1660                              expression="(objectUUID=0123456789abcde1)")
1661         self.assertEqual(len(res2), 1)
1662         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1663
1664         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1665                              scope=ldb.SCOPE_SUBTREE,
1666                              expression="(objectUUID=0123456789abcde2)")
1667         self.assertEqual(len(res3), 1)
1668         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1669
1670         try:
1671             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1672                           "OU=DUP2,DC=SAMBA,DC=ORG")
1673             self.fail("Should have failed on duplicate DN")
1674         except ldb.LdbError as err:
1675             enum = err.args[0]
1676             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1677
1678         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1679                       "OU=DUP3,DC=SAMBA,DC=ORG")
1680
1681         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1682                       "OU=DUP2,DC=SAMBA,DC=ORG")
1683
1684         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1685                              scope=ldb.SCOPE_SUBTREE,
1686                              expression="(objectUUID=0123456789abcde1)")
1687         self.assertEqual(len(res2), 1)
1688         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1689
1690         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1691                              scope=ldb.SCOPE_SUBTREE,
1692                              expression="(objectUUID=0123456789abcde2)")
1693         self.assertEqual(len(res3), 1)
1694         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1695
1696     def test_move_missing(self):
1697         try:
1698             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1699                           "OU=DUP2,DC=SAMBA,DC=ORG")
1700             self.fail("Should have failed on missing")
1701         except ldb.LdbError as err:
1702             enum = err.args[0]
1703             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1704
1705     def test_move_missing2(self):
1706         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1707                     "name": b"Admins",
1708                     "x": "z", "y": "a",
1709                     "objectUUID": b"0123456789abcde2"})
1710
1711         try:
1712             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1713                           "OU=DUP2,DC=SAMBA,DC=ORG")
1714             self.fail("Should have failed on missing")
1715         except ldb.LdbError as err:
1716             enum = err.args[0]
1717             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1718
1719     def test_move_bad(self):
1720         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1721                     "name": b"Admins",
1722                     "x": "z", "y": "a",
1723                     "objectUUID": b"0123456789abcde2"})
1724
1725         try:
1726             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
1727                           "OU=DUP2,DC=SAMBA,DC=ORG")
1728             self.fail("Should have failed on invalid DN")
1729         except ldb.LdbError as err:
1730             enum = err.args[0]
1731             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1732
1733     def test_move_bad2(self):
1734         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1735                     "name": b"Admins",
1736                     "x": "z", "y": "a",
1737                     "objectUUID": b"0123456789abcde2"})
1738
1739         try:
1740             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1741                           "OUXDUP2,DC=SAMBA,DC=ORG")
1742             self.fail("Should have failed on missing")
1743         except ldb.LdbError as err:
1744             enum = err.args[0]
1745             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1746
1747     def test_move_fail_move_add(self):
1748         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1749                     "name": b"Admins",
1750                     "x": "z", "y": "a",
1751                     "objectUUID": b"0123456789abcde1"})
1752         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1753                     "name": b"Admins",
1754                     "x": "z", "y": "a",
1755                     "objectUUID": b"0123456789abcde2"})
1756         try:
1757             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1758                           "OU=DUP2,DC=SAMBA,DC=ORG")
1759             self.fail("Should have failed on duplicate DN")
1760         except ldb.LdbError as err:
1761             enum = err.args[0]
1762             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1763
1764         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1765                       "OU=DUP3,DC=SAMBA,DC=ORG")
1766
1767         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1768                     "name": b"Admins",
1769                     "x": "z", "y": "a",
1770                     "objectUUID": b"0123456789abcde3"})
1771
1772
1773 class AddModifyTestsLmdb(AddModifyTests):
1774
1775     def setUp(self):
1776         if os.environ.get('HAVE_LMDB', '1') == '0':
1777             self.skipTest("No lmdb backend")
1778         self.prefix = MDB_PREFIX
1779         self.index = MDB_INDEX_OBJ
1780         super(AddModifyTestsLmdb, self).setUp()
1781
1782     def tearDown(self):
1783         super(AddModifyTestsLmdb, self).tearDown()
1784
1785
1786 class IndexedAddModifyTests(AddModifyTests):
1787     """Test searches using the index, to ensure the index doesn't
1788        break things"""
1789
1790     def setUp(self):
1791         if not hasattr(self, 'index'):
1792             self.index = {"dn": "@INDEXLIST",
1793                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID", b"z"],
1794                           "@IDXONE": [b"1"]}
1795         super(IndexedAddModifyTests, self).setUp()
1796
1797     def test_duplicate_GUID(self):
1798         try:
1799             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1800                         "name": b"Admins",
1801                         "x": "z", "y": "a",
1802                         "objectUUID": b"0123456789abcdef"})
1803             self.fail("Should have failed adding dupliate GUID")
1804         except ldb.LdbError as err:
1805             enum = err.args[0]
1806             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1807
1808     def test_duplicate_name_dup_GUID(self):
1809         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1810                     "name": b"Admins",
1811                     "x": "z", "y": "a",
1812                     "objectUUID": b"a123456789abcdef"})
1813         try:
1814             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1815                         "name": b"Admins",
1816                         "x": "z", "y": "a",
1817                         "objectUUID": b"a123456789abcdef"})
1818             self.fail("Should have failed adding dupliate GUID")
1819         except ldb.LdbError as err:
1820             enum = err.args[0]
1821             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1822
1823     def test_duplicate_name_dup_GUID2(self):
1824         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1825                     "name": b"Admins",
1826                     "x": "z", "y": "a",
1827                     "objectUUID": b"abc3456789abcdef"})
1828         try:
1829             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1830                         "name": b"Admins",
1831                         "x": "z", "y": "a",
1832                         "objectUUID": b"aaa3456789abcdef"})
1833             self.fail("Should have failed adding dupliate DN")
1834         except ldb.LdbError as err:
1835             enum = err.args[0]
1836             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1837
1838         # Checking the GUID didn't stick in the index
1839         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1840                     "name": b"Admins",
1841                     "x": "z", "y": "a",
1842                     "objectUUID": b"aaa3456789abcdef"})
1843
1844     def test_add_dup_guid_add(self):
1845         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1846                     "name": b"Admins",
1847                     "x": "z", "y": "a",
1848                     "objectUUID": b"0123456789abcde1"})
1849         try:
1850             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1851                         "name": b"Admins",
1852                         "x": "z", "y": "a",
1853                         "objectUUID": b"0123456789abcde1"})
1854             self.fail("Should have failed on duplicate GUID")
1855
1856         except ldb.LdbError as err:
1857             enum = err.args[0]
1858             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1859
1860         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1861                     "name": b"Admins",
1862                     "x": "z", "y": "a",
1863                     "objectUUID": b"0123456789abcde2"})
1864
1865     def test_duplicate_index_values(self):
1866         self.l.add({"dn": "OU=DIV1,DC=SAMBA,DC=ORG",
1867                     "name": b"Admins",
1868                     "z": "1",
1869                     "objectUUID": b"0123456789abcdff"})
1870         self.l.add({"dn": "OU=DIV2,DC=SAMBA,DC=ORG",
1871                     "name": b"Admins",
1872                     "z": "1",
1873                     "objectUUID": b"0123456789abcdfd"})
1874
1875
1876 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1877     """Test searches using the index, to ensure the index doesn't
1878        break things"""
1879
1880     def setUp(self):
1881         self.index = {"dn": "@INDEXLIST",
1882                       "@IDXATTR": [b"x", b"y", b"ou"],
1883                       "@IDXONE": [b"1"],
1884                       "@IDXGUID": [b"objectUUID"],
1885                       "@IDX_DN_GUID": [b"GUID"]}
1886         super(GUIDIndexedAddModifyTests, self).setUp()
1887
1888
1889 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1890     """Test GUID index behaviour insdie the transaction"""
1891
1892     def setUp(self):
1893         super(GUIDTransIndexedAddModifyTests, self).setUp()
1894         self.l.transaction_start()
1895
1896     def tearDown(self):
1897         self.l.transaction_commit()
1898         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1899
1900
1901 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1902     """Test index behaviour insdie the transaction"""
1903
1904     def setUp(self):
1905         super(TransIndexedAddModifyTests, self).setUp()
1906         self.l.transaction_start()
1907
1908     def tearDown(self):
1909         self.l.transaction_commit()
1910         super(TransIndexedAddModifyTests, self).tearDown()
1911
1912
1913 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1914
1915     def setUp(self):
1916         if os.environ.get('HAVE_LMDB', '1') == '0':
1917             self.skipTest("No lmdb backend")
1918         self.prefix = MDB_PREFIX
1919         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1920
1921     def tearDown(self):
1922         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1923
1924
1925 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1926
1927     def setUp(self):
1928         if os.environ.get('HAVE_LMDB', '1') == '0':
1929             self.skipTest("No lmdb backend")
1930         self.prefix = MDB_PREFIX
1931         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1932
1933     def tearDown(self):
1934         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1935
1936
1937 class BadIndexTests(LdbBaseTest):
1938     def setUp(self):
1939         super(BadIndexTests, self).setUp()
1940         self.testdir = tempdir()
1941         self.filename = os.path.join(self.testdir, "test.ldb")
1942         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1943         if hasattr(self, 'IDXGUID'):
1944             self.ldb.add({"dn": "@INDEXLIST",
1945                           "@IDXATTR": [b"x", b"y", b"ou"],
1946                           "@IDXGUID": [b"objectUUID"],
1947                           "@IDX_DN_GUID": [b"GUID"]})
1948         else:
1949             self.ldb.add({"dn": "@INDEXLIST",
1950                           "@IDXATTR": [b"x", b"y", b"ou"]})
1951
1952         super(BadIndexTests, self).setUp()
1953
1954     def test_unique(self):
1955         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1956                       "objectUUID": b"0123456789abcde1",
1957                       "y": "1"})
1958         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1959                       "objectUUID": b"0123456789abcde2",
1960                       "y": "1"})
1961         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1962                       "objectUUID": b"0123456789abcde3",
1963                       "y": "1"})
1964
1965         res = self.ldb.search(expression="(y=1)",
1966                               base="dc=samba,dc=org")
1967         self.assertEqual(len(res), 3)
1968
1969         # Now set this to unique index, but forget to check the result
1970         try:
1971             self.ldb.add({"dn": "@ATTRIBUTES",
1972                           "y": "UNIQUE_INDEX"})
1973             self.fail()
1974         except ldb.LdbError:
1975             pass
1976
1977         # We must still have a working index
1978         res = self.ldb.search(expression="(y=1)",
1979                               base="dc=samba,dc=org")
1980         self.assertEqual(len(res), 3)
1981
1982     def test_unique_transaction(self):
1983         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1984                       "objectUUID": b"0123456789abcde1",
1985                       "y": "1"})
1986         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1987                       "objectUUID": b"0123456789abcde2",
1988                       "y": "1"})
1989         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1990                       "objectUUID": b"0123456789abcde3",
1991                       "y": "1"})
1992
1993         res = self.ldb.search(expression="(y=1)",
1994                               base="dc=samba,dc=org")
1995         self.assertEqual(len(res), 3)
1996
1997         self.ldb.transaction_start()
1998
1999         # Now set this to unique index, but forget to check the result
2000         try:
2001             self.ldb.add({"dn": "@ATTRIBUTES",
2002                           "y": "UNIQUE_INDEX"})
2003         except ldb.LdbError:
2004             pass
2005
2006         try:
2007             self.ldb.transaction_commit()
2008             self.fail()
2009
2010         except ldb.LdbError as err:
2011             enum = err.args[0]
2012             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2013
2014         # We must still have a working index
2015         res = self.ldb.search(expression="(y=1)",
2016                               base="dc=samba,dc=org")
2017
2018         self.assertEqual(len(res), 3)
2019
2020     def test_casefold(self):
2021         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2022                       "objectUUID": b"0123456789abcde1",
2023                       "y": "a"})
2024         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2025                       "objectUUID": b"0123456789abcde2",
2026                       "y": "A"})
2027         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2028                       "objectUUID": b"0123456789abcde3",
2029                       "y": ["a", "A"]})
2030
2031         res = self.ldb.search(expression="(y=a)",
2032                               base="dc=samba,dc=org")
2033         self.assertEqual(len(res), 2)
2034
2035         self.ldb.add({"dn": "@ATTRIBUTES",
2036                       "y": "CASE_INSENSITIVE"})
2037
2038         # We must still have a working index
2039         res = self.ldb.search(expression="(y=a)",
2040                               base="dc=samba,dc=org")
2041
2042         if hasattr(self, 'IDXGUID'):
2043             self.assertEqual(len(res), 3)
2044         else:
2045             # We should not return this entry twice, but sadly
2046             # we have not yet fixed
2047             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2048             self.assertEqual(len(res), 4)
2049
2050     def test_casefold_transaction(self):
2051         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2052                       "objectUUID": b"0123456789abcde1",
2053                       "y": "a"})
2054         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2055                       "objectUUID": b"0123456789abcde2",
2056                       "y": "A"})
2057         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2058                       "objectUUID": b"0123456789abcde3",
2059                       "y": ["a", "A"]})
2060
2061         res = self.ldb.search(expression="(y=a)",
2062                               base="dc=samba,dc=org")
2063         self.assertEqual(len(res), 2)
2064
2065         self.ldb.transaction_start()
2066
2067         self.ldb.add({"dn": "@ATTRIBUTES",
2068                       "y": "CASE_INSENSITIVE"})
2069
2070         self.ldb.transaction_commit()
2071
2072         # We must still have a working index
2073         res = self.ldb.search(expression="(y=a)",
2074                               base="dc=samba,dc=org")
2075
2076         if hasattr(self, 'IDXGUID'):
2077             self.assertEqual(len(res), 3)
2078         else:
2079             # We should not return this entry twice, but sadly
2080             # we have not yet fixed
2081             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2082             self.assertEqual(len(res), 4)
2083
2084     def test_modify_transaction(self):
2085         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2086                       "objectUUID": b"0123456789abcde1",
2087                       "y": "2",
2088                       "z": "2"})
2089
2090         res = self.ldb.search(expression="(y=2)",
2091                               base="dc=samba,dc=org")
2092         self.assertEqual(len(res), 1)
2093
2094         self.ldb.add({"dn": "@ATTRIBUTES",
2095                       "y": "UNIQUE_INDEX"})
2096
2097         self.ldb.transaction_start()
2098
2099         m = ldb.Message()
2100         m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2101         m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2102         m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2103
2104         try:
2105             self.ldb.modify(m)
2106             self.fail()
2107
2108         except ldb.LdbError as err:
2109             enum = err.args[0]
2110             self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2111
2112         try:
2113             self.ldb.transaction_commit()
2114             # We should fail here, but we want to be sure
2115             # we fail below
2116
2117         except ldb.LdbError as err:
2118             enum = err.args[0]
2119             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2120
2121         # The index should still be pointing to x=y
2122         res = self.ldb.search(expression="(y=2)",
2123                               base="dc=samba,dc=org")
2124         self.assertEqual(len(res), 1)
2125
2126         try:
2127             self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
2128                         "objectUUID": b"0123456789abcde2",
2129                         "y": "2"})
2130             self.fail("Added unique attribute twice")
2131         except ldb.LdbError as err:
2132             enum = err.args[0]
2133             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2134
2135         res = self.ldb.search(expression="(y=2)",
2136                               base="dc=samba,dc=org")
2137         self.assertEqual(len(res), 1)
2138         self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
2139
2140     def tearDown(self):
2141         super(BadIndexTests, self).tearDown()
2142
2143
2144 class GUIDBadIndexTests(BadIndexTests):
2145     """Test Bad index things with GUID index mode"""
2146
2147     def setUp(self):
2148         self.IDXGUID = True
2149
2150         super(GUIDBadIndexTests, self).setUp()
2151
2152
2153 class GUIDBadIndexTestsLmdb(BadIndexTests):
2154
2155     def setUp(self):
2156         if os.environ.get('HAVE_LMDB', '1') == '0':
2157             self.skipTest("No lmdb backend")
2158         self.prefix = MDB_PREFIX
2159         self.index = MDB_INDEX_OBJ
2160         self.IDXGUID = True
2161         super(GUIDBadIndexTestsLmdb, self).setUp()
2162
2163     def tearDown(self):
2164         super(GUIDBadIndexTestsLmdb, self).tearDown()
2165
2166
2167 class BatchModeTests(LdbBaseTest):
2168
2169     def setUp(self):
2170         super(BatchModeTests, self).setUp()
2171         self.testdir = tempdir()
2172         self.filename = os.path.join(self.testdir, "test.ldb")
2173         self.ldb = ldb.Ldb(self.url(),
2174                            flags=self.flags(),
2175                            options=["batch_mode:1"])
2176         if hasattr(self, 'IDXGUID'):
2177             self.ldb.add({"dn": "@INDEXLIST",
2178                           "@IDXATTR": [b"x", b"y", b"ou"],
2179                           "@IDXGUID": [b"objectUUID"],
2180                           "@IDX_DN_GUID": [b"GUID"]})
2181         else:
2182             self.ldb.add({"dn": "@INDEXLIST",
2183                           "@IDXATTR": [b"x", b"y", b"ou"]})
2184
2185     def test_modify_transaction(self):
2186         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2187                       "objectUUID": b"0123456789abcde1",
2188                       "y": "2",
2189                       "z": "2"})
2190
2191         res = self.ldb.search(expression="(y=2)",
2192                               base="dc=samba,dc=org")
2193         self.assertEqual(len(res), 1)
2194
2195         self.ldb.add({"dn": "@ATTRIBUTES",
2196                       "y": "UNIQUE_INDEX"})
2197
2198         self.ldb.transaction_start()
2199
2200         m = ldb.Message()
2201         m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2202         m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2203         m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2204
2205         try:
2206             self.ldb.modify(m)
2207             self.fail()
2208
2209         except ldb.LdbError as err:
2210             enum = err.args[0]
2211             self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2212
2213         try:
2214             self.ldb.transaction_commit()
2215             self.fail("Commit should have failed as we were in batch mode")
2216         except ldb.LdbError as err:
2217             enum = err.args[0]
2218             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2219
2220     def tearDown(self):
2221         super(BatchModeTests, self).tearDown()
2222
2223
2224 class DnTests(TestCase):
2225
2226     def setUp(self):
2227         super(DnTests, self).setUp()
2228         self.ldb = ldb.Ldb()
2229
2230     def tearDown(self):
2231         super(DnTests, self).tearDown()
2232         del(self.ldb)
2233
2234     def test_set_dn_invalid(self):
2235         x = ldb.Message()
2236
2237         def assign():
2238             x.dn = "astring"
2239         self.assertRaises(TypeError, assign)
2240
2241     def test_eq(self):
2242         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2243         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2244         self.assertEqual(x, y)
2245         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2246         self.assertNotEqual(x, y)
2247
2248     def test_str(self):
2249         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2250         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2251
2252     def test_repr(self):
2253         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2254         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2255
2256     def test_get_casefold_2(self):
2257         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2258         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2259
2260     def test_validate(self):
2261         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2262         self.assertTrue(x.validate())
2263
2264     def test_parent(self):
2265         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2266         self.assertEqual("bar=bloe", x.parent().__str__())
2267
2268     def test_parent_nonexistent(self):
2269         x = ldb.Dn(self.ldb, "@BLA")
2270         self.assertEqual(None, x.parent())
2271
2272     def test_is_valid(self):
2273         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2274         self.assertTrue(x.is_valid())
2275         x = ldb.Dn(self.ldb, "")
2276         self.assertTrue(x.is_valid())
2277
2278     def test_is_special(self):
2279         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2280         self.assertFalse(x.is_special())
2281         x = ldb.Dn(self.ldb, "@FOOBAR")
2282         self.assertTrue(x.is_special())
2283
2284     def test_check_special(self):
2285         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2286         self.assertFalse(x.check_special("FOOBAR"))
2287         x = ldb.Dn(self.ldb, "@FOOBAR")
2288         self.assertTrue(x.check_special("@FOOBAR"))
2289
2290     def test_len(self):
2291         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2292         self.assertEqual(2, len(x))
2293         x = ldb.Dn(self.ldb, "dc=foo21")
2294         self.assertEqual(1, len(x))
2295
2296     def test_add_child(self):
2297         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2298         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2299         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2300
2301     def test_add_base(self):
2302         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2303         base = ldb.Dn(self.ldb, "bla=bloe")
2304         self.assertTrue(x.add_base(base))
2305         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2306
2307     def test_add_child_str(self):
2308         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2309         self.assertTrue(x.add_child("bla=bloe"))
2310         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2311
2312     def test_add_base_str(self):
2313         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2314         base = "bla=bloe"
2315         self.assertTrue(x.add_base(base))
2316         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2317
2318     def test_add(self):
2319         x = ldb.Dn(self.ldb, "dc=foo24")
2320         y = ldb.Dn(self.ldb, "bar=bla")
2321         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2322
2323     def test_remove_base_components(self):
2324         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2325         x.remove_base_components(len(x) - 1)
2326         self.assertEqual("dc=foo24", str(x))
2327
2328     def test_parse_ldif(self):
2329         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2330         msg = next(msgs)
2331         self.assertEqual("foo=bar", str(msg[1].dn))
2332         self.assertTrue(isinstance(msg[1], ldb.Message))
2333         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2334         self.assertEqual("dn: foo=bar\n\n", ldif)
2335
2336     def test_parse_ldif_more(self):
2337         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2338         msg = next(msgs)
2339         self.assertEqual("foo=bar", str(msg[1].dn))
2340         msg = next(msgs)
2341         self.assertEqual("bar=bar", str(msg[1].dn))
2342
2343     def test_print_ldif(self):
2344         ldif = '''dn: dc=foo27
2345 foo: foo
2346
2347 '''
2348         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2349         self.msg["foo"] = [b"foo"]
2350         self.assertEqual(ldif,
2351                          self.ldb.write_ldif(self.msg,
2352                                              ldb.CHANGETYPE_NONE))
2353
2354     def test_print_ldif_binary(self):
2355         # this also confirms that ldb flags are set even without a URL)
2356         self.ldb = ldb.Ldb(flags=ldb.FLG_SHOW_BINARY)
2357         ldif = '''dn: dc=foo27
2358 foo: f
2359 öö
2360
2361 '''
2362         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2363         self.msg["foo"] = ["f\nöö"]
2364         self.assertEqual(ldif,
2365                          self.ldb.write_ldif(self.msg,
2366                                              ldb.CHANGETYPE_NONE))
2367
2368
2369     def test_print_ldif_no_base64_bad(self):
2370         ldif = '''dn: dc=foo27
2371 foo: f
2372 öö
2373
2374 '''
2375         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2376         self.msg["foo"] = ["f\nöö"]
2377         self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2378         self.assertEqual(ldif,
2379                          self.ldb.write_ldif(self.msg,
2380                                              ldb.CHANGETYPE_NONE))
2381
2382     def test_print_ldif_no_base64_good(self):
2383         ldif = '''dn: dc=foo27
2384 foo: föö
2385
2386 '''
2387         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2388         self.msg["foo"] = ["föö"]
2389         self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2390         self.assertEqual(ldif,
2391                          self.ldb.write_ldif(self.msg,
2392                                              ldb.CHANGETYPE_NONE))
2393
2394     def test_canonical_string(self):
2395         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2396         self.assertEqual("/bloe/foo25", x.canonical_str())
2397
2398     def test_canonical_ex_string(self):
2399         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2400         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2401
2402     def test_ldb_is_child_of(self):
2403         """Testing ldb_dn_compare_dn"""
2404         dn1 = ldb.Dn(self.ldb, "dc=base")
2405         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
2406         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
2407         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
2408
2409         self.assertTrue(dn1.is_child_of(dn1))
2410         self.assertTrue(dn2.is_child_of(dn1))
2411         self.assertTrue(dn4.is_child_of(dn1))
2412         self.assertTrue(dn4.is_child_of(dn3))
2413         self.assertTrue(dn4.is_child_of(dn4))
2414         self.assertFalse(dn3.is_child_of(dn2))
2415         self.assertFalse(dn1.is_child_of(dn4))
2416
2417     def test_ldb_is_child_of_str(self):
2418         """Testing ldb_dn_compare_dn"""
2419         dn1_str = "dc=base"
2420         dn2_str = "cn=foo,dc=base"
2421         dn3_str = "cn=bar,dc=base"
2422         dn4_str = "cn=baz,cn=bar,dc=base"
2423
2424         dn1 = ldb.Dn(self.ldb, dn1_str)
2425         dn2 = ldb.Dn(self.ldb, dn2_str)
2426         dn3 = ldb.Dn(self.ldb, dn3_str)
2427         dn4 = ldb.Dn(self.ldb, dn4_str)
2428
2429         self.assertTrue(dn1.is_child_of(dn1_str))
2430         self.assertTrue(dn2.is_child_of(dn1_str))
2431         self.assertTrue(dn4.is_child_of(dn1_str))
2432         self.assertTrue(dn4.is_child_of(dn3_str))
2433         self.assertTrue(dn4.is_child_of(dn4_str))
2434         self.assertFalse(dn3.is_child_of(dn2_str))
2435         self.assertFalse(dn1.is_child_of(dn4_str))
2436
2437     def test_get_component_name(self):
2438         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2439         self.assertEqual(dn.get_component_name(0), 'cn')
2440         self.assertEqual(dn.get_component_name(1), 'dc')
2441         self.assertEqual(dn.get_component_name(2), None)
2442         self.assertEqual(dn.get_component_name(-1), None)
2443
2444     def test_get_component_value(self):
2445         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2446         self.assertEqual(dn.get_component_value(0), 'foo')
2447         self.assertEqual(dn.get_component_value(1), 'base')
2448         self.assertEqual(dn.get_component_name(2), None)
2449         self.assertEqual(dn.get_component_name(-1), None)
2450
2451     def test_set_component(self):
2452         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2453         dn.set_component(0, 'cn', 'bar')
2454         self.assertEqual(str(dn), "cn=bar,dc=base")
2455         dn.set_component(1, 'o', 'asep')
2456         self.assertEqual(str(dn), "cn=bar,o=asep")
2457         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
2458         self.assertEqual(str(dn), "cn=bar,o=asep")
2459         dn.set_component(1, 'o', 'a,b+c')
2460         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
2461
2462     def test_set_component_bytes(self):
2463         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2464         dn.set_component(0, 'cn', b'bar')
2465         self.assertEqual(str(dn), "cn=bar,dc=base")
2466         dn.set_component(1, 'o', b'asep')
2467         self.assertEqual(str(dn), "cn=bar,o=asep")
2468
2469     def test_set_component_none(self):
2470         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2471         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
2472
2473     def test_get_extended_component_null(self):
2474         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
2475         self.assertEqual(dn.get_extended_component("TEST"), None)
2476
2477     def test_get_extended_component(self):
2478         self.ldb._register_test_extensions()
2479         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2480         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2481
2482     def test_set_extended_component(self):
2483         self.ldb._register_test_extensions()
2484         dn = ldb.Dn(self.ldb, "dc=base")
2485         dn.set_extended_component("TEST", "foo")
2486         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
2487         dn.set_extended_component("TEST", b"bar")
2488         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
2489
2490     def test_extended_str(self):
2491         self.ldb._register_test_extensions()
2492         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
2493         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
2494
2495     def test_get_rdn_name(self):
2496         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2497         self.assertEqual(dn.get_rdn_name(), 'cn')
2498
2499     def test_get_rdn_value(self):
2500         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2501         self.assertEqual(dn.get_rdn_value(), 'foo')
2502
2503     def test_get_casefold(self):
2504         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2505         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
2506
2507     def test_get_linearized(self):
2508         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2509         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
2510
2511     def test_is_null(self):
2512         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
2513         self.assertFalse(dn.is_null())
2514
2515         dn = ldb.Dn(self.ldb, '')
2516         self.assertTrue(dn.is_null())
2517
2518
2519 class LdbMsgTests(TestCase):
2520
2521     def setUp(self):
2522         super(LdbMsgTests, self).setUp()
2523         self.msg = ldb.Message()
2524
2525     def test_init_dn(self):
2526         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
2527         self.assertEqual("dc=foo27", str(self.msg.dn))
2528
2529     def test_iter_items(self):
2530         self.assertEqual(0, len(self.msg.items()))
2531         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
2532         self.assertEqual(1, len(self.msg.items()))
2533
2534     def test_repr(self):
2535         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
2536         self.msg["dc"] = b"foo"
2537         if PY3:
2538             self.assertIn(repr(self.msg), [
2539                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
2540                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
2541             ])
2542             self.assertIn(repr(self.msg.text), [
2543                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
2544                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
2545             ])
2546         else:
2547             self.assertIn(repr(self.msg), [
2548                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})",
2549                 "Message({'dc': MessageElement(['foo']), 'dn': Dn('dc=foo29')})",
2550             ])
2551             self.assertIn(repr(self.msg.text), [
2552                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text",
2553                 "Message({'dc': MessageElement(['foo']), 'dn': Dn('dc=foo29')}).text",
2554             ])
2555
2556     def test_len(self):
2557         self.assertEqual(0, len(self.msg))
2558
2559     def test_notpresent(self):
2560         self.assertRaises(KeyError, lambda: self.msg["foo"])
2561
2562     def test_del(self):
2563         del self.msg["foo"]
2564
2565     def test_add(self):
2566         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
2567
2568     def test_add_text(self):
2569         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
2570
2571     def test_elements_empty(self):
2572         self.assertEqual([], self.msg.elements())
2573
2574     def test_elements(self):
2575         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2576         self.msg.add(el)
2577         self.assertEqual([el], self.msg.elements())
2578         self.assertEqual([el.text], self.msg.text.elements())
2579
2580     def test_add_value(self):
2581         self.assertEqual(0, len(self.msg))
2582         self.msg["foo"] = [b"foo"]
2583         self.assertEqual(1, len(self.msg))
2584
2585     def test_add_value_text(self):
2586         self.assertEqual(0, len(self.msg))
2587         self.msg["foo"] = ["foo"]
2588         self.assertEqual(1, len(self.msg))
2589
2590     def test_add_value_multiple(self):
2591         self.assertEqual(0, len(self.msg))
2592         self.msg["foo"] = [b"foo", b"bla"]
2593         self.assertEqual(1, len(self.msg))
2594         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2595
2596     def test_add_value_multiple_text(self):
2597         self.assertEqual(0, len(self.msg))
2598         self.msg["foo"] = ["foo", "bla"]
2599         self.assertEqual(1, len(self.msg))
2600         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2601
2602     def test_set_value(self):
2603         self.msg["foo"] = [b"fool"]
2604         self.assertEqual([b"fool"], list(self.msg["foo"]))
2605         self.msg["foo"] = [b"bar"]
2606         self.assertEqual([b"bar"], list(self.msg["foo"]))
2607
2608     def test_set_value_text(self):
2609         self.msg["foo"] = ["fool"]
2610         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2611         self.msg["foo"] = ["bar"]
2612         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2613
2614     def test_keys(self):
2615         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2616         self.msg["foo"] = [b"bla"]
2617         self.msg["bar"] = [b"bla"]
2618         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
2619
2620     def test_keys_text(self):
2621         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2622         self.msg["foo"] = ["bla"]
2623         self.msg["bar"] = ["bla"]
2624         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
2625
2626     def test_dn(self):
2627         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2628         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2629
2630     def test_get_dn(self):
2631         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2632         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2633
2634     def test_dn_text(self):
2635         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2636         self.assertEqual("@BASEINFO", str(self.msg.dn))
2637         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2638
2639     def test_get_dn_text(self):
2640         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2641         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2642         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2643
2644     def test_get_invalid(self):
2645         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2646         self.assertRaises(TypeError, self.msg.get, 42)
2647
2648     def test_get_other(self):
2649         self.msg["foo"] = [b"bar"]
2650         self.assertEqual(b"bar", self.msg.get("foo")[0])
2651         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2652         self.assertEqual(None, self.msg.get("foo", idx=1))
2653         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2654
2655     def test_get_other_text(self):
2656         self.msg["foo"] = ["bar"]
2657         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2658         self.assertEqual("bar", self.msg.text.get("foo")[0])
2659         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2660         self.assertEqual(None, self.msg.get("foo", idx=1))
2661         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2662
2663     def test_get_default(self):
2664         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2665         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2666
2667     def test_get_default_text(self):
2668         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2669         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2670
2671     def test_get_unknown(self):
2672         self.assertEqual(None, self.msg.get("lalalala"))
2673
2674     def test_get_unknown_text(self):
2675         self.assertEqual(None, self.msg.text.get("lalalala"))
2676
2677     def test_msg_diff(self):
2678         l = ldb.Ldb()
2679         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2680         msg1 = next(msgs)[1]
2681         msg2 = next(msgs)[1]
2682         msgdiff = l.msg_diff(msg1, msg2)
2683         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2684         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2685         self.assertEqual(1, len(msgdiff))
2686
2687     def test_equal_empty(self):
2688         msg1 = ldb.Message()
2689         msg2 = ldb.Message()
2690         self.assertEqual(msg1, msg2)
2691
2692     def test_equal_simplel(self):
2693         db = ldb.Ldb()
2694         msg1 = ldb.Message()
2695         msg1.dn = ldb.Dn(db, "foo=bar")
2696         msg2 = ldb.Message()
2697         msg2.dn = ldb.Dn(db, "foo=bar")
2698         self.assertEqual(msg1, msg2)
2699         msg1['foo'] = b'bar'
2700         msg2['foo'] = b'bar'
2701         self.assertEqual(msg1, msg2)
2702         msg2['foo'] = b'blie'
2703         self.assertNotEqual(msg1, msg2)
2704         msg2['foo'] = b'blie'
2705
2706     def test_from_dict(self):
2707         rec = {"dn": "dc=fromdict",
2708                "a1": [b"a1-val1", b"a1-val1"]}
2709         l = ldb.Ldb()
2710         # check different types of input Flags
2711         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2712             m = ldb.Message.from_dict(l, rec, flags)
2713             self.assertEqual(rec["a1"], list(m["a1"]))
2714             self.assertEqual(flags, m["a1"].flags())
2715         # check input params
2716         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2717         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2718         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2719         # Message.from_dict expects dictionary with 'dn'
2720         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2721         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2722
2723     def test_from_dict_text(self):
2724         rec = {"dn": "dc=fromdict",
2725                "a1": ["a1-val1", "a1-val1"]}
2726         l = ldb.Ldb()
2727         # check different types of input Flags
2728         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2729             m = ldb.Message.from_dict(l, rec, flags)
2730             self.assertEqual(rec["a1"], list(m.text["a1"]))
2731             self.assertEqual(flags, m.text["a1"].flags())
2732         # check input params
2733         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2734         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2735         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2736         # Message.from_dict expects dictionary with 'dn'
2737         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2738         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2739
2740     def test_copy_add_message_element(self):
2741         m = ldb.Message()
2742         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2743         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2744         mto = ldb.Message()
2745         mto["1"] = m["1"]
2746         mto["2"] = m["2"]
2747         self.assertEqual(mto["1"], m["1"])
2748         self.assertEqual(mto["2"], m["2"])
2749         mto = ldb.Message()
2750         mto.add(m["1"])
2751         mto.add(m["2"])
2752         self.assertEqual(mto["1"], m["1"])
2753         self.assertEqual(mto["2"], m["2"])
2754
2755     def test_copy_add_message_element_text(self):
2756         m = ldb.Message()
2757         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2758         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2759         mto = ldb.Message()
2760         mto["1"] = m["1"]
2761         mto["2"] = m["2"]
2762         self.assertEqual(mto["1"], m.text["1"])
2763         self.assertEqual(mto["2"], m.text["2"])
2764         mto = ldb.Message()
2765         mto.add(m["1"])
2766         mto.add(m["2"])
2767         self.assertEqual(mto.text["1"], m.text["1"])
2768         self.assertEqual(mto.text["2"], m.text["2"])
2769         self.assertEqual(mto["1"], m["1"])
2770         self.assertEqual(mto["2"], m["2"])
2771
2772
2773 class MessageElementTests(TestCase):
2774
2775     def test_cmp_element(self):
2776         x = ldb.MessageElement([b"foo"])
2777         y = ldb.MessageElement([b"foo"])
2778         z = ldb.MessageElement([b"bzr"])
2779         self.assertEqual(x, y)
2780         self.assertNotEqual(x, z)
2781
2782     def test_cmp_element_text(self):
2783         x = ldb.MessageElement([b"foo"])
2784         y = ldb.MessageElement(["foo"])
2785         self.assertEqual(x, y)
2786
2787     def test_create_iterable(self):
2788         x = ldb.MessageElement([b"foo"])
2789         self.assertEqual([b"foo"], list(x))
2790         self.assertEqual(["foo"], list(x.text))
2791
2792     def test_repr(self):
2793         x = ldb.MessageElement([b"foo"])
2794         if PY3:
2795             self.assertEqual("MessageElement([b'foo'])", repr(x))
2796             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2797         else:
2798             self.assertEqual("MessageElement(['foo'])", repr(x))
2799             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2800         x = ldb.MessageElement([b"foo", b"bla"])
2801         self.assertEqual(2, len(x))
2802         if PY3:
2803             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2804             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2805         else:
2806             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2807             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2808
2809     def test_get_item(self):
2810         x = ldb.MessageElement([b"foo", b"bar"])
2811         self.assertEqual(b"foo", x[0])
2812         self.assertEqual(b"bar", x[1])
2813         self.assertEqual(b"bar", x[-1])
2814         self.assertRaises(IndexError, lambda: x[45])
2815
2816     def test_get_item_text(self):
2817         x = ldb.MessageElement(["foo", "bar"])
2818         self.assertEqual("foo", x.text[0])
2819         self.assertEqual("bar", x.text[1])
2820         self.assertEqual("bar", x.text[-1])
2821         self.assertRaises(IndexError, lambda: x[45])
2822
2823     def test_len(self):
2824         x = ldb.MessageElement([b"foo", b"bar"])
2825         self.assertEqual(2, len(x))
2826
2827     def test_eq(self):
2828         x = ldb.MessageElement([b"foo", b"bar"])
2829         y = ldb.MessageElement([b"foo", b"bar"])
2830         self.assertEqual(y, x)
2831         x = ldb.MessageElement([b"foo"])
2832         self.assertNotEqual(y, x)
2833         y = ldb.MessageElement([b"foo"])
2834         self.assertEqual(y, x)
2835
2836     def test_extended(self):
2837         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2838         if PY3:
2839             self.assertEqual("MessageElement([b'456'])", repr(el))
2840             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2841         else:
2842             self.assertEqual("MessageElement(['456'])", repr(el))
2843             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2844
2845     def test_bad_text(self):
2846         el = ldb.MessageElement(b'\xba\xdd')
2847         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2848
2849
2850 class ModuleTests(TestCase):
2851
2852     def setUp(self):
2853         super(ModuleTests, self).setUp()
2854         self.testdir = tempdir()
2855         self.filename = os.path.join(self.testdir, "test.ldb")
2856         self.ldb = ldb.Ldb(self.filename)
2857
2858     def tearDown(self):
2859         shutil.rmtree(self.testdir)
2860         super(ModuleTests, self).setUp()
2861
2862     def test_register_module(self):
2863         class ExampleModule:
2864             name = "example"
2865         ldb.register_module(ExampleModule)
2866
2867     def test_use_module(self):
2868         ops = []
2869
2870         class ExampleModule:
2871             name = "bla"
2872
2873             def __init__(self, ldb, next):
2874                 ops.append("init")
2875                 self.next = next
2876
2877             def search(self, *args, **kwargs):
2878                 return self.next.search(*args, **kwargs)
2879
2880             def request(self, *args, **kwargs):
2881                 pass
2882
2883         ldb.register_module(ExampleModule)
2884         l = ldb.Ldb(self.filename)
2885         l.add({"dn": "@MODULES", "@LIST": "bla"})
2886         self.assertEqual([], ops)
2887         l = ldb.Ldb(self.filename)
2888         self.assertEqual(["init"], ops)
2889
2890
2891 class LdbResultTests(LdbBaseTest):
2892
2893     def setUp(self):
2894         super(LdbResultTests, self).setUp()
2895         self.testdir = tempdir()
2896         self.filename = os.path.join(self.testdir, "test.ldb")
2897         self.l = ldb.Ldb(self.url(), flags=self.flags())
2898         try:
2899             self.l.add(self.index)
2900         except AttributeError:
2901             pass
2902         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2903                     "objectUUID": b"0123456789abcde0"})
2904         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2905                     "objectUUID": b"0123456789abcde1"})
2906         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2907                     "objectUUID": b"0123456789abcde2"})
2908         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2909                     "objectUUID": b"0123456789abcde3"})
2910         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2911                     "objectUUID": b"0123456789abcde4"})
2912         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2913                     "objectUUID": b"0123456789abcde5"})
2914         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2915                     "objectUUID": b"0123456789abcde6"})
2916         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2917                     "objectUUID": b"0123456789abcde7"})
2918         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2919                     "objectUUID": b"0123456789abcde8"})
2920         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2921                     "objectUUID": b"0123456789abcde9"})
2922         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2923                     "objectUUID": b"0123456789abcdea"})
2924         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2925                     "objectUUID": b"0123456789abcdeb"})
2926         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2927                     "objectUUID": b"0123456789abcdec"})
2928
2929     def tearDown(self):
2930         shutil.rmtree(self.testdir)
2931         super(LdbResultTests, self).tearDown()
2932         # Ensure the LDB is closed now, so we close the FD
2933         del(self.l)
2934
2935     def test_return_type(self):
2936         res = self.l.search()
2937         self.assertEqual(str(res), "<ldb result>")
2938
2939     def test_get_msgs(self):
2940         res = self.l.search()
2941         list = res.msgs
2942
2943     def test_get_controls(self):
2944         res = self.l.search()
2945         list = res.controls
2946
2947     def test_get_referals(self):
2948         res = self.l.search()
2949         list = res.referals
2950
2951     def test_iter_msgs(self):
2952         found = False
2953         for l in self.l.search().msgs:
2954             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2955                 found = True
2956         self.assertTrue(found)
2957
2958     def test_iter_msgs_count(self):
2959         self.assertTrue(self.l.search().count > 0)
2960         # 13 objects has been added to the DC=SAMBA, DC=ORG
2961         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2962
2963     def test_iter_controls(self):
2964         res = self.l.search().controls
2965         it = iter(res)
2966
2967     def test_create_control(self):
2968         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2969         c = ldb.Control(self.l, "relax:1")
2970         self.assertEqual(c.critical, True)
2971         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2972
2973     def test_iter_refs(self):
2974         res = self.l.search().referals
2975         it = iter(res)
2976
2977     def test_search_sequence_msgs(self):
2978         found = False
2979         res = self.l.search().msgs
2980
2981         for i in range(0, len(res)):
2982             l = res[i]
2983             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2984                 found = True
2985         self.assertTrue(found)
2986
2987     def test_search_as_iter(self):
2988         found = False
2989         res = self.l.search()
2990
2991         for l in res:
2992             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2993                 found = True
2994         self.assertTrue(found)
2995
2996     def test_search_iter(self):
2997         found = False
2998         res = self.l.search_iterator()
2999
3000         for l in res:
3001             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3002                 found = True
3003         self.assertTrue(found)
3004
3005     # Show that search results can't see into a transaction
3006
3007     def test_search_against_trans(self):
3008         found11 = False
3009
3010         (r1, w1) = os.pipe()
3011
3012         (r2, w2) = os.pipe()
3013
3014         # For the first element, fork a child that will
3015         # write to the DB
3016         pid = os.fork()
3017         if pid == 0:
3018             # In the child, re-open
3019             del(self.l)
3020             gc.collect()
3021
3022             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3023             # start a transaction
3024             child_ldb.transaction_start()
3025
3026             # write to it
3027             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3028                            "name": b"samba.org",
3029                            "objectUUID": b"o123456789acbdef"})
3030
3031             os.write(w1, b"added")
3032
3033             # Now wait for the search to be done
3034             os.read(r2, 6)
3035
3036             # and commit
3037             try:
3038                 child_ldb.transaction_commit()
3039             except ldb.LdbError as err:
3040                 # We print this here to see what went wrong in the child
3041                 print(err)
3042                 os._exit(1)
3043
3044             os.write(w1, b"transaction")
3045             os._exit(0)
3046
3047         self.assertEqual(os.read(r1, 5), b"added")
3048
3049         # This should not turn up until the transaction is concluded
3050         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3051                               scope=ldb.SCOPE_BASE)
3052         self.assertEqual(len(res11), 0)
3053
3054         os.write(w2, b"search")
3055
3056         # Now wait for the transaction to be done.  This should
3057         # deadlock, but the search doesn't hold a read lock for the
3058         # iterator lifetime currently.
3059         self.assertEqual(os.read(r1, 11), b"transaction")
3060
3061         # This should now turn up, as the transaction is over
3062         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3063                               scope=ldb.SCOPE_BASE)
3064         self.assertEqual(len(res11), 1)
3065
3066         self.assertFalse(found11)
3067
3068         (got_pid, status) = os.waitpid(pid, 0)
3069         self.assertEqual(got_pid, pid)
3070
3071     def test_search_iter_against_trans(self):
3072         found = False
3073         found11 = False
3074
3075         # We need to hold this iterator open to hold the all-record
3076         # lock
3077         res = self.l.search_iterator()
3078
3079         (r1, w1) = os.pipe()
3080
3081         (r2, w2) = os.pipe()
3082
3083         # For the first element, with the sequence open (which
3084         # means with ldb locks held), fork a child that will
3085         # write to the DB
3086         pid = os.fork()
3087         if pid == 0:
3088             # In the child, re-open
3089             del(res)
3090             del(self.l)
3091             gc.collect()
3092
3093             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3094             # start a transaction
3095             child_ldb.transaction_start()
3096
3097             # write to it
3098             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3099                            "name": b"samba.org",
3100                            "objectUUID": b"o123456789acbdef"})
3101
3102             os.write(w1, b"added")
3103
3104             # Now wait for the search to be done
3105             os.read(r2, 6)
3106
3107             # and commit
3108             try:
3109                 child_ldb.transaction_commit()
3110             except ldb.LdbError as err:
3111                 # We print this here to see what went wrong in the child
3112                 print(err)
3113                 os._exit(1)
3114
3115             os.write(w1, b"transaction")
3116             os._exit(0)
3117
3118         self.assertEqual(os.read(r1, 5), b"added")
3119
3120         # This should not turn up until the transaction is concluded
3121         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3122                               scope=ldb.SCOPE_BASE)
3123         self.assertEqual(len(res11), 0)
3124
3125         os.write(w2, b"search")
3126
3127         # allow the transaction to start
3128         time.sleep(1)
3129
3130         # This should not turn up until the search finishes and
3131         # removed the read lock, but for ldb_tdb that happened as soon
3132         # as we called the first res.next()
3133         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3134                               scope=ldb.SCOPE_BASE)
3135         self.assertEqual(len(res11), 0)
3136
3137         # These results are all collected at the first next(res) call
3138         for l in res:
3139             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3140                 found = True
3141             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
3142                 found11 = True
3143
3144         # Now wait for the transaction to be done.
3145         self.assertEqual(os.read(r1, 11), b"transaction")
3146
3147         # This should now turn up, as the transaction is over and all
3148         # read locks are gone
3149         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3150                               scope=ldb.SCOPE_BASE)
3151         self.assertEqual(len(res11), 1)
3152
3153         self.assertTrue(found)
3154         self.assertFalse(found11)
3155
3156         (got_pid, status) = os.waitpid(pid, 0)
3157         self.assertEqual(got_pid, pid)
3158
3159
3160 class LdbResultTestsLmdb(LdbResultTests):
3161
3162     def setUp(self):
3163         if os.environ.get('HAVE_LMDB', '1') == '0':
3164             self.skipTest("No lmdb backend")
3165         self.prefix = MDB_PREFIX
3166         self.index = MDB_INDEX_OBJ
3167         super(LdbResultTestsLmdb, self).setUp()
3168
3169     def tearDown(self):
3170         super(LdbResultTestsLmdb, self).tearDown()
3171
3172
3173 class BadTypeTests(TestCase):
3174     def test_control(self):
3175         l = ldb.Ldb()
3176         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
3177         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
3178
3179     def test_modify(self):
3180         l = ldb.Ldb()
3181         dn = ldb.Dn(l, 'a=b')
3182         m = ldb.Message(dn)
3183         self.assertRaises(TypeError, l.modify, '<bad type>')
3184         self.assertRaises(TypeError, l.modify, m, '<bad type>')
3185
3186     def test_add(self):
3187         l = ldb.Ldb()
3188         dn = ldb.Dn(l, 'a=b')
3189         m = ldb.Message(dn)
3190         self.assertRaises(TypeError, l.add, '<bad type>')
3191         self.assertRaises(TypeError, l.add, m, '<bad type>')
3192
3193     def test_delete(self):
3194         l = ldb.Ldb()
3195         dn = ldb.Dn(l, 'a=b')
3196         self.assertRaises(TypeError, l.add, '<bad type>')
3197         self.assertRaises(TypeError, l.add, dn, '<bad type>')
3198
3199     def test_rename(self):
3200         l = ldb.Ldb()
3201         dn = ldb.Dn(l, 'a=b')
3202         self.assertRaises(TypeError, l.add, '<bad type>', dn)
3203         self.assertRaises(TypeError, l.add, dn, '<bad type>')
3204         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
3205
3206     def test_search(self):
3207         l = ldb.Ldb()
3208         self.assertRaises(TypeError, l.search, base=1234)
3209         self.assertRaises(TypeError, l.search, scope='<bad type>')
3210         self.assertRaises(TypeError, l.search, expression=1234)
3211         self.assertRaises(TypeError, l.search, attrs='<bad type>')
3212         self.assertRaises(TypeError, l.search, controls='<bad type>')
3213
3214
3215 class VersionTests(TestCase):
3216
3217     def test_version(self):
3218         self.assertTrue(isinstance(ldb.__version__, str))
3219
3220 class NestedTransactionTests(LdbBaseTest):
3221     def setUp(self):
3222         super(NestedTransactionTests, self).setUp()
3223         self.testdir = tempdir()
3224         self.filename = os.path.join(self.testdir, "test.ldb")
3225         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
3226         self.ldb.add({"dn": "@INDEXLIST",
3227                       "@IDXATTR": [b"x", b"y", b"ou"],
3228                       "@IDXGUID": [b"objectUUID"],
3229                       "@IDX_DN_GUID": [b"GUID"]})
3230
3231         super(NestedTransactionTests, self).setUp()
3232
3233     #
3234     # This test documents that currently ldb does not support true nested
3235     # transactions.
3236     #
3237     # Note: The test is written so that it treats failure as pass.
3238     #       It is done this way as standalone ldb builds do not use the samba
3239     #       known fail mechanism
3240     #
3241     def test_nested_transactions(self):
3242
3243         self.ldb.transaction_start()
3244
3245         self.ldb.add({"dn": "x=x1,dc=samba,dc=org",
3246                       "objectUUID": b"0123456789abcde1"})
3247         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3248                               base="dc=samba,dc=org")
3249         self.assertEqual(len(res), 1)
3250
3251         self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
3252                       "objectUUID": b"0123456789abcde2"})
3253         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3254                               base="dc=samba,dc=org")
3255         self.assertEqual(len(res), 1)
3256
3257         self.ldb.transaction_start()
3258         self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
3259                       "objectUUID": b"0123456789abcde3"})
3260         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3261                               base="dc=samba,dc=org")
3262         self.assertEqual(len(res), 1)
3263         self.ldb.transaction_cancel()
3264         #
3265         # Check that we can not see the record added by the cancelled
3266         # transaction.
3267         # Currently this fails as ldb does not support true nested
3268         # transactions, and only the outer commits and cancels have an effect
3269         #
3270         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3271                               base="dc=samba,dc=org")
3272         #
3273         # FIXME this test currently passes on a failure, i.e. if nested
3274         #       transaction support worked correctly the correct test would
3275         #       be.
3276         #         self.assertEqual(len(res), 0)
3277         #       as the add of objectUUID=0123456789abcde3 would reverted when
3278         #       the sub transaction it was nested in was rolled back.
3279         #
3280         #       Currently this is not the case so the record is still present.
3281         self.assertEqual(len(res), 1)
3282
3283
3284         # Commit the outer transaction
3285         #
3286         self.ldb.transaction_commit()
3287         #
3288         # Now check we can still see the records added in the outer
3289         # transaction.
3290         #
3291         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3292                               base="dc=samba,dc=org")
3293         self.assertEqual(len(res), 1)
3294         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3295                               base="dc=samba,dc=org")
3296         self.assertEqual(len(res), 1)
3297         #
3298         # And that we can't see the records added by the nested transaction.
3299         #
3300         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3301                               base="dc=samba,dc=org")
3302         # FIXME again if nested transactions worked correctly we would not
3303         #       see this record. The test should be.
3304         #         self.assertEqual(len(res), 0)
3305         self.assertEqual(len(res), 1)
3306
3307     def tearDown(self):
3308         super(NestedTransactionTests, self).tearDown()
3309
3310
3311 class LmdbNestedTransactionTests(NestedTransactionTests):
3312
3313     def setUp(self):
3314         if os.environ.get('HAVE_LMDB', '1') == '0':
3315             self.skipTest("No lmdb backend")
3316         self.prefix = MDB_PREFIX
3317         self.index = MDB_INDEX_OBJ
3318         super(LmdbNestedTransactionTests, self).setUp()
3319
3320     def tearDown(self):
3321         super(LmdbNestedTransactionTests, self).tearDown()
3322
3323
3324 if __name__ == '__main__':
3325     import unittest
3326     unittest.TestProgram()