s4:kdc: Implement KDC plugin hardware authentication policy
[samba.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python3
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 sys.path.insert(0, "bin/python")
9 import gc
10 import time
11 import ldb
12 import shutil
13 import errno
14
15
16 TDB_PREFIX = "tdb://"
17 MDB_PREFIX = "mdb://"
18
19 MDB_INDEX_OBJ = {
20     "dn": "@INDEXLIST",
21     "@IDXONE": [b"1"],
22     "@IDXGUID": [b"objectUUID"],
23     "@IDX_DN_GUID": [b"GUID"]
24 }
25
26
27 def tempdir():
28     import tempfile
29     try:
30         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
31     except KeyError:
32         dir_prefix = None
33     return tempfile.mkdtemp(dir=dir_prefix)
34
35
36 class NoContextTests(TestCase):
37
38     def test_valid_attr_name(self):
39         self.assertTrue(ldb.valid_attr_name("foo"))
40         self.assertFalse(ldb.valid_attr_name("24foo"))
41
42     def test_timestring(self):
43         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
44         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
45
46         self.assertEqual("00000101000000.0Z", ldb.timestring(-62167219200))
47         self.assertEqual("99991231235959.0Z", ldb.timestring(253402300799))
48
49         # should result with OSError EOVERFLOW from gmtime()
50         with self.assertRaises(OSError) as err:
51             ldb.timestring(-62167219201)
52         self.assertEqual(err.exception.errno, errno.EOVERFLOW)
53         with self.assertRaises(OSError) as err:
54             ldb.timestring(253402300800)
55         self.assertEqual(err.exception.errno, errno.EOVERFLOW)
56         with self.assertRaises(OSError) as err:
57             ldb.timestring(0x7fffffffffffffff)
58         self.assertEqual(err.exception.errno, errno.EOVERFLOW)
59
60     def test_string_to_time(self):
61         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
62         self.assertEqual(-1, ldb.string_to_time("19691231235959.0Z"))
63         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
64
65         self.assertEqual(-62167219200, ldb.string_to_time("00000101000000.0Z"))
66         self.assertEqual(253402300799, ldb.string_to_time("99991231235959.0Z"))
67
68     def test_binary_encode(self):
69         encoded = ldb.binary_encode(b'test\\x')
70         decoded = ldb.binary_decode(encoded)
71         self.assertEqual(decoded, b'test\\x')
72
73         encoded2 = ldb.binary_encode('test\\x')
74         self.assertEqual(encoded2, encoded)
75
76
77 class LdbBaseTest(TestCase):
78     def setUp(self):
79         super(LdbBaseTest, self).setUp()
80         try:
81             if self.prefix is None:
82                 self.prefix = TDB_PREFIX
83         except AttributeError:
84             self.prefix = TDB_PREFIX
85
86     def tearDown(self):
87         super(LdbBaseTest, self).tearDown()
88
89     def url(self):
90         return self.prefix + self.filename
91
92     def flags(self):
93         if self.prefix == MDB_PREFIX:
94             return ldb.FLG_NOSYNC
95         else:
96             return 0
97
98
99 class SimpleLdb(LdbBaseTest):
100
101     def setUp(self):
102         super(SimpleLdb, self).setUp()
103         self.testdir = tempdir()
104         self.filename = os.path.join(self.testdir, "test.ldb")
105         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
106         try:
107             self.ldb.add(self.index)
108         except AttributeError:
109             pass
110
111     def tearDown(self):
112         shutil.rmtree(self.testdir)
113         super(SimpleLdb, self).tearDown()
114         # Ensure the LDB is closed now, so we close the FD
115         del(self.ldb)
116
117     def test_connect(self):
118         ldb.Ldb(self.url(), flags=self.flags())
119
120     def test_connect_none(self):
121         ldb.Ldb()
122
123     def test_connect_later(self):
124         x = ldb.Ldb()
125         x.connect(self.url(), flags=self.flags())
126
127     def test_connect_twice(self):
128         url = self.url()
129         x = ldb.Ldb(url)
130         with self.assertRaises(ldb.LdbError):
131             x.connect(url, flags=self.flags())
132
133     def test_connect_twice_later(self):
134         url = self.url()
135         flags = self.flags()
136         x = ldb.Ldb()
137         x.connect(url, flags)
138         with self.assertRaises(ldb.LdbError):
139             x.connect(url, flags)
140
141     def test_connect_and_disconnect(self):
142         url = self.url()
143         flags = self.flags()
144         x = ldb.Ldb()
145         x.connect(url, flags)
146         x.disconnect()
147         x.connect(url, flags)
148         x.disconnect()
149
150     def test_repr(self):
151         x = ldb.Ldb()
152         self.assertTrue(repr(x).startswith("<ldb connection"))
153
154     def test_set_create_perms(self):
155         x = ldb.Ldb()
156         x.set_create_perms(0o600)
157
158     def test_search(self):
159         l = ldb.Ldb(self.url(), flags=self.flags())
160         self.assertEqual(len(l.search()), 0)
161
162     def test_search_controls(self):
163         l = ldb.Ldb(self.url(), flags=self.flags())
164         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
165
166     def test_utf8_ldb_Dn(self):
167         l = ldb.Ldb(self.url(), flags=self.flags())
168         dn = ldb.Dn(l, (b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc').decode('utf8'))
169
170     def test_utf8_encoded_ldb_Dn(self):
171         l = ldb.Ldb(self.url(), flags=self.flags())
172         dn_encoded_utf8 = b'a=' + b'\xc4\x85\xc4\x87\xc4\x99\xc5\x82\xc5\x84\xc3\xb3\xc5\x9b\xc5\xba\xc5\xbc'
173         try:
174             dn = ldb.Dn(l, dn_encoded_utf8)
175         except UnicodeDecodeError as e:
176                 raise
177         except TypeError as te:
178            p3errors = ["argument 2 must be str, not bytes",
179                        "Can't convert 'bytes' object to str implicitly"]
180            self.assertIn(str(te), p3errors)
181
182     def test_search_attrs(self):
183         l = ldb.Ldb(self.url(), flags=self.flags())
184         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
185
186     def test_search_string_dn(self):
187         l = ldb.Ldb(self.url(), flags=self.flags())
188         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
189
190     def test_search_attr_string(self):
191         l = ldb.Ldb(self.url(), flags=self.flags())
192         self.assertRaises(TypeError, l.search, attrs="dc")
193         self.assertRaises(TypeError, l.search, attrs=b"dc")
194
195     def test_opaque(self):
196         l = ldb.Ldb(self.url(), flags=self.flags())
197         l.set_opaque("my_opaque", True)
198         self.assertTrue(l.get_opaque("my_opaque") is not None)
199         self.assertEqual(None, l.get_opaque("unknown"))
200
201     def test_opaque_bool(self):
202         """Test that we can set boolean opaque values."""
203
204         db = ldb.Ldb(self.url(), flags=self.flags())
205         name = "my_opaque"
206
207         db.set_opaque(name, False)
208         self.assertEqual(False, db.get_opaque(name))
209
210         db.set_opaque(name, True)
211         self.assertEqual(True, db.get_opaque(name))
212
213     def test_opaque_int(self):
214         """Test that we can set (positive) integer opaque values."""
215
216         db = ldb.Ldb(self.url(), flags=self.flags())
217         name = "my_opaque"
218
219         db.set_opaque(name, 0)
220         self.assertEqual(0, db.get_opaque(name))
221
222         db.set_opaque(name, 12345678)
223         self.assertEqual(12345678, db.get_opaque(name))
224
225         # Negative values can’t be set.
226         self.assertRaises(OverflowError, db.set_opaque, name, -99999)
227
228     def test_opaque_string(self):
229         """Test that we can set string opaque values."""
230
231         db = ldb.Ldb(self.url(), flags=self.flags())
232         name = "my_opaque"
233
234         db.set_opaque(name, "")
235         self.assertEqual("", db.get_opaque(name))
236
237         db.set_opaque(name, "foo bar")
238         self.assertEqual("foo bar", db.get_opaque(name))
239
240     def test_opaque_none(self):
241         """Test that we can set an opaque to None to effectively unset it."""
242
243         db = ldb.Ldb(self.url(), flags=self.flags())
244         name = "my_opaque"
245
246         # An opaque that has not been set is the same as None.
247         self.assertIsNone(db.get_opaque(name))
248
249         # Give the opaque a value.
250         db.set_opaque(name, 3)
251         self.assertEqual(3, db.get_opaque(name))
252
253         # Test that we can set the opaque to None to unset it.
254         db.set_opaque(name, None)
255         self.assertIsNone(db.get_opaque(name))
256
257     def test_opaque_unsupported(self):
258         """Test that trying to set unsupported values raises an error."""
259
260         db = ldb.Ldb(self.url(), flags=self.flags())
261         name = "my_opaque"
262
263         self.assertRaises(ValueError, db.set_opaque, name, [])
264         self.assertRaises(ValueError, db.set_opaque, name, ())
265         self.assertRaises(ValueError, db.set_opaque, name, 3.14)
266         self.assertRaises(ValueError, db.set_opaque, name, 3+2j)
267         self.assertRaises(ValueError, db.set_opaque, name, b'foo')
268
269     def test_search_scope_base_empty_db(self):
270         l = ldb.Ldb(self.url(), flags=self.flags())
271         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
272                                       ldb.SCOPE_BASE)), 0)
273
274     def test_search_scope_onelevel_empty_db(self):
275         l = ldb.Ldb(self.url(), flags=self.flags())
276         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
277                                       ldb.SCOPE_ONELEVEL)), 0)
278
279     def test_delete(self):
280         l = ldb.Ldb(self.url(), flags=self.flags())
281         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
282
283     def test_delete_w_unhandled_ctrl(self):
284         l = ldb.Ldb(self.url(), flags=self.flags())
285         m = ldb.Message()
286         m.dn = ldb.Dn(l, "dc=foo1")
287         m["b"] = [b"a"]
288         m["objectUUID"] = b"0123456789abcdef"
289         l.add(m)
290         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
291         l.delete(m.dn)
292
293     def test_contains(self):
294         name = self.url()
295         l = ldb.Ldb(name, flags=self.flags())
296         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
297         l = ldb.Ldb(name, flags=self.flags())
298         m = ldb.Message()
299         m.dn = ldb.Dn(l, "dc=foo3")
300         m["b"] = ["a"]
301         m["objectUUID"] = b"0123456789abcdef"
302         l.add(m)
303         try:
304             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
305             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
306         finally:
307             l.delete(m.dn)
308
309     def test_get_config_basedn(self):
310         l = ldb.Ldb(self.url(), flags=self.flags())
311         self.assertEqual(None, l.get_config_basedn())
312
313     def test_get_root_basedn(self):
314         l = ldb.Ldb(self.url(), flags=self.flags())
315         self.assertEqual(None, l.get_root_basedn())
316
317     def test_get_schema_basedn(self):
318         l = ldb.Ldb(self.url(), flags=self.flags())
319         self.assertEqual(None, l.get_schema_basedn())
320
321     def test_get_default_basedn(self):
322         l = ldb.Ldb(self.url(), flags=self.flags())
323         self.assertEqual(None, l.get_default_basedn())
324
325     def test_add(self):
326         l = ldb.Ldb(self.url(), flags=self.flags())
327         m = ldb.Message()
328         m.dn = ldb.Dn(l, "dc=foo4")
329         m["bla"] = b"bla"
330         m["objectUUID"] = b"0123456789abcdef"
331         self.assertEqual(len(l.search()), 0)
332         l.add(m)
333         try:
334             self.assertEqual(len(l.search()), 1)
335         finally:
336             l.delete(ldb.Dn(l, "dc=foo4"))
337
338     def test_search_iterator(self):
339         l = ldb.Ldb(self.url(), flags=self.flags())
340         s = l.search_iterator()
341         s.abandon()
342         try:
343             for me in s:
344                 self.fail()
345             self.fail()
346         except RuntimeError as re:
347             pass
348         try:
349             s.abandon()
350             self.fail()
351         except RuntimeError as re:
352             pass
353         try:
354             s.result()
355             self.fail()
356         except RuntimeError as re:
357             pass
358
359         s = l.search_iterator()
360         count = 0
361         for me in s:
362             self.assertTrue(isinstance(me, ldb.Message))
363             count += 1
364         r = s.result()
365         self.assertEqual(len(r), 0)
366         self.assertEqual(count, 0)
367
368         m1 = ldb.Message()
369         m1.dn = ldb.Dn(l, "dc=foo4")
370         m1["bla"] = b"bla"
371         m1["objectUUID"] = b"0123456789abcdef"
372         l.add(m1)
373         try:
374             s = l.search_iterator()
375             msgs = []
376             for me in s:
377                 self.assertTrue(isinstance(me, ldb.Message))
378                 count += 1
379                 msgs.append(me)
380             r = s.result()
381             self.assertEqual(len(r), 0)
382             self.assertEqual(len(msgs), 1)
383             self.assertEqual(msgs[0].dn, m1.dn)
384
385             m2 = ldb.Message()
386             m2.dn = ldb.Dn(l, "dc=foo5")
387             m2["bla"] = b"bla"
388             m2["objectUUID"] = b"0123456789abcdee"
389             l.add(m2)
390
391             s = l.search_iterator()
392             msgs = []
393             for me in s:
394                 self.assertTrue(isinstance(me, ldb.Message))
395                 count += 1
396                 msgs.append(me)
397             r = s.result()
398             self.assertEqual(len(r), 0)
399             self.assertEqual(len(msgs), 2)
400             if msgs[0].dn == m1.dn:
401                 self.assertEqual(msgs[0].dn, m1.dn)
402                 self.assertEqual(msgs[1].dn, m2.dn)
403             else:
404                 self.assertEqual(msgs[0].dn, m2.dn)
405                 self.assertEqual(msgs[1].dn, m1.dn)
406
407             s = l.search_iterator()
408             msgs = []
409             for me in s:
410                 self.assertTrue(isinstance(me, ldb.Message))
411                 count += 1
412                 msgs.append(me)
413                 break
414             try:
415                 s.result()
416                 self.fail()
417             except RuntimeError as re:
418                 pass
419             for me in s:
420                 self.assertTrue(isinstance(me, ldb.Message))
421                 count += 1
422                 msgs.append(me)
423                 break
424             for me in s:
425                 self.fail()
426
427             r = s.result()
428             self.assertEqual(len(r), 0)
429             self.assertEqual(len(msgs), 2)
430             if msgs[0].dn == m1.dn:
431                 self.assertEqual(msgs[0].dn, m1.dn)
432                 self.assertEqual(msgs[1].dn, m2.dn)
433             else:
434                 self.assertEqual(msgs[0].dn, m2.dn)
435                 self.assertEqual(msgs[1].dn, m1.dn)
436         finally:
437             l.delete(ldb.Dn(l, "dc=foo4"))
438             l.delete(ldb.Dn(l, "dc=foo5"))
439
440     def test_add_text(self):
441         l = ldb.Ldb(self.url(), flags=self.flags())
442         m = ldb.Message()
443         m.dn = ldb.Dn(l, "dc=foo4")
444         m["bla"] = "bla"
445         m["objectUUID"] = b"0123456789abcdef"
446         self.assertEqual(len(l.search()), 0)
447         l.add(m)
448         try:
449             self.assertEqual(len(l.search()), 1)
450         finally:
451             l.delete(ldb.Dn(l, "dc=foo4"))
452
453     def test_add_w_unhandled_ctrl(self):
454         l = ldb.Ldb(self.url(), flags=self.flags())
455         m = ldb.Message()
456         m.dn = ldb.Dn(l, "dc=foo4")
457         m["bla"] = b"bla"
458         self.assertEqual(len(l.search()), 0)
459         self.assertRaises(ldb.LdbError, lambda: l.add(m, ["search_options:1:2"]))
460
461     def test_add_dict(self):
462         l = ldb.Ldb(self.url(), flags=self.flags())
463         m = {"dn": ldb.Dn(l, "dc=foo5"),
464              "bla": b"bla",
465              "objectUUID": b"0123456789abcdef"}
466         self.assertEqual(len(l.search()), 0)
467         l.add(m)
468         try:
469             self.assertEqual(len(l.search()), 1)
470         finally:
471             l.delete(ldb.Dn(l, "dc=foo5"))
472
473     def test_add_dict_text(self):
474         l = ldb.Ldb(self.url(), flags=self.flags())
475         m = {"dn": ldb.Dn(l, "dc=foo5"),
476              "bla": "bla",
477              "objectUUID": b"0123456789abcdef"}
478         self.assertEqual(len(l.search()), 0)
479         l.add(m)
480         try:
481             self.assertEqual(len(l.search()), 1)
482         finally:
483             l.delete(ldb.Dn(l, "dc=foo5"))
484
485     def test_add_dict_string_dn(self):
486         l = ldb.Ldb(self.url(), flags=self.flags())
487         m = {"dn": "dc=foo6", "bla": b"bla",
488              "objectUUID": b"0123456789abcdef"}
489         self.assertEqual(len(l.search()), 0)
490         l.add(m)
491         try:
492             self.assertEqual(len(l.search()), 1)
493         finally:
494             l.delete(ldb.Dn(l, "dc=foo6"))
495
496     def test_add_dict_bytes_dn(self):
497         l = ldb.Ldb(self.url(), flags=self.flags())
498         m = {"dn": b"dc=foo6", "bla": b"bla",
499              "objectUUID": b"0123456789abcdef"}
500         self.assertEqual(len(l.search()), 0)
501         l.add(m)
502         try:
503             self.assertEqual(len(l.search()), 1)
504         finally:
505             l.delete(ldb.Dn(l, "dc=foo6"))
506
507     def test_rename(self):
508         l = ldb.Ldb(self.url(), flags=self.flags())
509         m = ldb.Message()
510         m.dn = ldb.Dn(l, "dc=foo7")
511         m["bla"] = b"bla"
512         m["objectUUID"] = b"0123456789abcdef"
513         self.assertEqual(len(l.search()), 0)
514         l.add(m)
515         try:
516             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
517             self.assertEqual(len(l.search()), 1)
518         finally:
519             l.delete(ldb.Dn(l, "dc=bar"))
520
521     def test_rename_string_dns(self):
522         l = ldb.Ldb(self.url(), flags=self.flags())
523         m = ldb.Message()
524         m.dn = ldb.Dn(l, "dc=foo8")
525         m["bla"] = b"bla"
526         m["objectUUID"] = b"0123456789abcdef"
527         self.assertEqual(len(l.search()), 0)
528         l.add(m)
529         self.assertEqual(len(l.search()), 1)
530         try:
531             l.rename("dc=foo8", "dc=bar")
532             self.assertEqual(len(l.search()), 1)
533         finally:
534             l.delete(ldb.Dn(l, "dc=bar"))
535
536     def test_rename_bad_string_dns(self):
537         l = ldb.Ldb(self.url(), flags=self.flags())
538         m = ldb.Message()
539         m.dn = ldb.Dn(l, "dc=foo8")
540         m["bla"] = b"bla"
541         m["objectUUID"] = b"0123456789abcdef"
542         self.assertEqual(len(l.search()), 0)
543         l.add(m)
544         self.assertEqual(len(l.search()), 1)
545         self.assertRaises(ldb.LdbError,lambda: l.rename("dcXfoo8", "dc=bar"))
546         self.assertRaises(ldb.LdbError,lambda: l.rename("dc=foo8", "dcXbar"))
547         l.delete(ldb.Dn(l, "dc=foo8"))
548
549     def test_empty_dn(self):
550         l = ldb.Ldb(self.url(), flags=self.flags())
551         self.assertEqual(0, len(l.search()))
552         m = ldb.Message()
553         m.dn = ldb.Dn(l, "dc=empty")
554         m["objectUUID"] = b"0123456789abcdef"
555         l.add(m)
556         rm = l.search()
557         self.assertEqual(1, len(rm))
558         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
559                          set(rm[0].keys()))
560
561         rm = l.search(m.dn)
562         self.assertEqual(1, len(rm))
563         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
564                          set(rm[0].keys()))
565         rm = l.search(m.dn, attrs=["blah"])
566         self.assertEqual(1, len(rm))
567         self.assertEqual(0, len(rm[0]))
568
569     def test_modify_delete(self):
570         l = ldb.Ldb(self.url(), flags=self.flags())
571         m = ldb.Message()
572         m.dn = ldb.Dn(l, "dc=modifydelete")
573         m["bla"] = [b"1234"]
574         m["objectUUID"] = b"0123456789abcdef"
575         l.add(m)
576         rm = l.search(m.dn)[0]
577         self.assertEqual([b"1234"], list(rm["bla"]))
578         try:
579             m = ldb.Message()
580             m.dn = ldb.Dn(l, "dc=modifydelete")
581             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
582             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
583             l.modify(m)
584             rm = l.search(m.dn)
585             self.assertEqual(1, len(rm))
586             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
587                              set(rm[0].keys()))
588             rm = l.search(m.dn, attrs=["bla"])
589             self.assertEqual(1, len(rm))
590             self.assertEqual(0, len(rm[0]))
591         finally:
592             l.delete(ldb.Dn(l, "dc=modifydelete"))
593
594     def test_modify_delete_text(self):
595         l = ldb.Ldb(self.url(), flags=self.flags())
596         m = ldb.Message()
597         m.dn = ldb.Dn(l, "dc=modifydelete")
598         m.text["bla"] = ["1234"]
599         m["objectUUID"] = b"0123456789abcdef"
600         l.add(m)
601         rm = l.search(m.dn)[0]
602         self.assertEqual(["1234"], list(rm.text["bla"]))
603         try:
604             m = ldb.Message()
605             m.dn = ldb.Dn(l, "dc=modifydelete")
606             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
607             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
608             l.modify(m)
609             rm = l.search(m.dn)
610             self.assertEqual(1, len(rm))
611             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
612                              set(rm[0].keys()))
613             rm = l.search(m.dn, attrs=["bla"])
614             self.assertEqual(1, len(rm))
615             self.assertEqual(0, len(rm[0]))
616         finally:
617             l.delete(ldb.Dn(l, "dc=modifydelete"))
618
619     def test_modify_add(self):
620         l = ldb.Ldb(self.url(), flags=self.flags())
621         m = ldb.Message()
622         m.dn = ldb.Dn(l, "dc=add")
623         m["bla"] = [b"1234"]
624         m["objectUUID"] = b"0123456789abcdef"
625         l.add(m)
626         try:
627             m = ldb.Message()
628             m.dn = ldb.Dn(l, "dc=add")
629             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
630             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
631             l.modify(m)
632             rm = l.search(m.dn)[0]
633             self.assertEqual(3, len(rm))
634             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
635         finally:
636             l.delete(ldb.Dn(l, "dc=add"))
637
638     def test_modify_add_text(self):
639         l = ldb.Ldb(self.url(), flags=self.flags())
640         m = ldb.Message()
641         m.dn = ldb.Dn(l, "dc=add")
642         m.text["bla"] = ["1234"]
643         m["objectUUID"] = b"0123456789abcdef"
644         l.add(m)
645         try:
646             m = ldb.Message()
647             m.dn = ldb.Dn(l, "dc=add")
648             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
649             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
650             l.modify(m)
651             rm = l.search(m.dn)[0]
652             self.assertEqual(3, len(rm))
653             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
654         finally:
655             l.delete(ldb.Dn(l, "dc=add"))
656
657     def test_modify_replace(self):
658         l = ldb.Ldb(self.url(), flags=self.flags())
659         m = ldb.Message()
660         m.dn = ldb.Dn(l, "dc=modify2")
661         m["bla"] = [b"1234", b"456"]
662         m["objectUUID"] = b"0123456789abcdef"
663         l.add(m)
664         try:
665             m = ldb.Message()
666             m.dn = ldb.Dn(l, "dc=modify2")
667             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
668             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
669             l.modify(m)
670             rm = l.search(m.dn)[0]
671             self.assertEqual(3, len(rm))
672             self.assertEqual([b"789"], list(rm["bla"]))
673             rm = l.search(m.dn, attrs=["bla"])[0]
674             self.assertEqual(1, len(rm))
675         finally:
676             l.delete(ldb.Dn(l, "dc=modify2"))
677
678     def test_modify_replace_text(self):
679         l = ldb.Ldb(self.url(), flags=self.flags())
680         m = ldb.Message()
681         m.dn = ldb.Dn(l, "dc=modify2")
682         m.text["bla"] = ["1234", "456"]
683         m["objectUUID"] = b"0123456789abcdef"
684         l.add(m)
685         try:
686             m = ldb.Message()
687             m.dn = ldb.Dn(l, "dc=modify2")
688             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
689             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
690             l.modify(m)
691             rm = l.search(m.dn)[0]
692             self.assertEqual(3, len(rm))
693             self.assertEqual(["789"], list(rm.text["bla"]))
694             rm = l.search(m.dn, attrs=["bla"])[0]
695             self.assertEqual(1, len(rm))
696         finally:
697             l.delete(ldb.Dn(l, "dc=modify2"))
698
699     def test_modify_flags_change(self):
700         l = ldb.Ldb(self.url(), flags=self.flags())
701         m = ldb.Message()
702         m.dn = ldb.Dn(l, "dc=add")
703         m["bla"] = [b"1234"]
704         m["objectUUID"] = b"0123456789abcdef"
705         l.add(m)
706         try:
707             m = ldb.Message()
708             m.dn = ldb.Dn(l, "dc=add")
709             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
710             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
711             l.modify(m)
712             rm = l.search(m.dn)[0]
713             self.assertEqual(3, len(rm))
714             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
715
716             # Now create another modify, but switch the flags before we do it
717             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
718             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
719             l.modify(m)
720             rm = l.search(m.dn, attrs=["bla"])[0]
721             self.assertEqual(1, len(rm))
722             self.assertEqual([b"1234"], list(rm["bla"]))
723         finally:
724             l.delete(ldb.Dn(l, "dc=add"))
725
726     def test_modify_flags_change_text(self):
727         l = ldb.Ldb(self.url(), flags=self.flags())
728         m = ldb.Message()
729         m.dn = ldb.Dn(l, "dc=add")
730         m.text["bla"] = ["1234"]
731         m["objectUUID"] = b"0123456789abcdef"
732         l.add(m)
733         try:
734             m = ldb.Message()
735             m.dn = ldb.Dn(l, "dc=add")
736             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
737             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
738             l.modify(m)
739             rm = l.search(m.dn)[0]
740             self.assertEqual(3, len(rm))
741             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
742
743             # Now create another modify, but switch the flags before we do it
744             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
745             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
746             l.modify(m)
747             rm = l.search(m.dn, attrs=["bla"])[0]
748             self.assertEqual(1, len(rm))
749             self.assertEqual(["1234"], list(rm.text["bla"]))
750         finally:
751             l.delete(ldb.Dn(l, "dc=add"))
752
753     def test_transaction_commit(self):
754         l = ldb.Ldb(self.url(), flags=self.flags())
755         l.transaction_start()
756         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
757         m["foo"] = [b"bar"]
758         m["objectUUID"] = b"0123456789abcdef"
759         l.add(m)
760         l.transaction_commit()
761         l.delete(m.dn)
762
763     def test_transaction_cancel(self):
764         l = ldb.Ldb(self.url(), flags=self.flags())
765         l.transaction_start()
766         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
767         m["foo"] = [b"bar"]
768         m["objectUUID"] = b"0123456789abcdee"
769         l.add(m)
770         l.transaction_cancel()
771         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
772
773     def test_set_debug(self):
774         def my_report_fn(level, text):
775             pass
776         l = ldb.Ldb(self.url(), flags=self.flags())
777         l.set_debug(my_report_fn)
778
779     def test_zero_byte_string(self):
780         """Testing we do not get trapped in the \0 byte in a property string."""
781         l = ldb.Ldb(self.url(), flags=self.flags())
782         l.add({
783             "dn": b"dc=somedn",
784             "objectclass": b"user",
785             "cN": b"LDAPtestUSER",
786             "givenname": b"ldap",
787             "displayname": b"foo\0bar",
788             "objectUUID": b"0123456789abcdef"
789         })
790         res = l.search(expression="(dn=dc=somedn)")
791         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
792
793     def test_no_crash_broken_expr(self):
794         l = ldb.Ldb(self.url(), flags=self.flags())
795         self.assertRaises(ldb.LdbError, lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
796
797 # Run the SimpleLdb tests against an lmdb backend
798
799
800 class SimpleLdbLmdb(SimpleLdb):
801
802     def setUp(self):
803         if os.environ.get('HAVE_LMDB', '1') == '0':
804             self.skipTest("No lmdb backend")
805         self.prefix = MDB_PREFIX
806         self.index = MDB_INDEX_OBJ
807         super(SimpleLdbLmdb, self).setUp()
808
809     def tearDown(self):
810         super(SimpleLdbLmdb, self).tearDown()
811
812
813 class SimpleLdbNoLmdb(LdbBaseTest):
814
815     def setUp(self):
816         if os.environ.get('HAVE_LMDB', '1') != '0':
817             self.skipTest("lmdb backend enabled")
818         self.prefix = MDB_PREFIX
819         self.index = MDB_INDEX_OBJ
820         super(SimpleLdbNoLmdb, self).setUp()
821
822     def tearDown(self):
823         super(SimpleLdbNoLmdb, self).tearDown()
824
825     def test_lmdb_disabled(self):
826         self.testdir = tempdir()
827         self.filename = os.path.join(self.testdir, "test.ldb")
828         try:
829             self.ldb = ldb.Ldb(self.url(), flags=self.flags())
830             self.fail("Should have failed on missing LMDB")
831         except ldb.LdbError as err:
832             enum = err.args[0]
833             self.assertEqual(enum, ldb.ERR_OTHER)
834
835
836 class SearchTests(LdbBaseTest):
837     def tearDown(self):
838         shutil.rmtree(self.testdir)
839         super(SearchTests, self).tearDown()
840
841         # Ensure the LDB is closed now, so we close the FD
842         del(self.l)
843
844     def setUp(self):
845         super(SearchTests, self).setUp()
846         self.testdir = tempdir()
847         self.filename = os.path.join(self.testdir, "search_test.ldb")
848         options = ["modules:rdn_name"]
849         if hasattr(self, 'IDXCHECK'):
850             options.append("disable_full_db_scan_for_self_test:1")
851         self.l = ldb.Ldb(self.url(),
852                          flags=self.flags(),
853                          options=options)
854         try:
855             self.l.add(self.index)
856         except AttributeError:
857             pass
858
859         self.l.add({"dn": "@ATTRIBUTES",
860                     "DC": "CASE_INSENSITIVE"})
861
862         # Note that we can't use the name objectGUID here, as we
863         # want to stay clear of the objectGUID handler in LDB and
864         # instead use just the 16 bytes raw, which we just keep
865         # to printable chars here for ease of handling.
866
867         self.l.add({"dn": "DC=ORG",
868                     "name": b"org",
869                     "objectUUID": b"0000000000abcdef"})
870         self.l.add({"dn": "DC=EXAMPLE,DC=ORG",
871                     "name": b"org",
872                     "objectUUID": b"0000000001abcdef"})
873         self.l.add({"dn": "OU=OU1,DC=EXAMPLE,DC=ORG",
874                     "name": b"OU #1",
875                     "x": "y", "y": "a",
876                     "objectUUID": b"0023456789abcde3"})
877         self.l.add({"dn": "OU=OU2,DC=EXAMPLE,DC=ORG",
878                     "name": b"OU #2",
879                     "x": "y", "y": "a",
880                     "objectUUID": b"0023456789abcde4"})
881         self.l.add({"dn": "OU=OU3,DC=EXAMPLE,DC=ORG",
882                     "name": b"OU #3",
883                     "x": "y", "y": "a",
884                     "objectUUID": b"0023456789abcde5"})
885         self.l.add({"dn": "OU=OU4,DC=EXAMPLE,DC=ORG",
886                     "name": b"OU #4",
887                     "x": "z", "y": "b",
888                     "objectUUID": b"0023456789abcde6"})
889         self.l.add({"dn": "OU=OU5,DC=EXAMPLE,DC=ORG",
890                     "name": b"OU #5",
891                     "x": "y", "y": "a",
892                     "objectUUID": b"0023456789abcde7"})
893         self.l.add({"dn": "OU=OU6,DC=EXAMPLE,DC=ORG",
894                     "name": b"OU #6",
895                     "x": "y", "y": "a",
896                     "objectUUID": b"0023456789abcde8"})
897         self.l.add({"dn": "OU=OU7,DC=EXAMPLE,DC=ORG",
898                     "name": b"OU #7",
899                     "x": "y", "y": "c",
900                     "objectUUID": b"0023456789abcde9"})
901         self.l.add({"dn": "OU=OU8,DC=EXAMPLE,DC=ORG",
902                     "name": b"OU #8",
903                     "x": "y", "y": "b",
904                     "objectUUID": b"0023456789abcde0"})
905         self.l.add({"dn": "OU=OU9,DC=EXAMPLE,DC=ORG",
906                     "name": b"OU #9",
907                     "x": "y", "y": "a",
908                     "objectUUID": b"0023456789abcdea"})
909
910         self.l.add({"dn": "DC=EXAMPLE,DC=COM",
911                     "name": b"org",
912                     "objectUUID": b"0000000011abcdef"})
913
914         self.l.add({"dn": "DC=EXAMPLE,DC=NET",
915                     "name": b"org",
916                     "objectUUID": b"0000000021abcdef"})
917
918         self.l.add({"dn": "OU=UNIQUE,DC=EXAMPLE,DC=NET",
919                     "objectUUID": b"0000000022abcdef"})
920
921         self.l.add({"dn": "DC=SAMBA,DC=ORG",
922                     "name": b"samba.org",
923                     "objectUUID": b"0123456789abcdef"})
924         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
925                     "name": b"Admins",
926                     "x": "z", "y": "a",
927                     "objectUUID": b"0123456789abcde1"})
928         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
929                     "name": b"Users",
930                     "x": "z", "y": "a",
931                     "objectUUID": b"0123456789abcde2"})
932         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
933                     "name": b"OU #1",
934                     "x": "y", "y": "a",
935                     "objectUUID": b"0123456789abcde3"})
936         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
937                     "name": b"OU #2",
938                     "x": "y", "y": "a",
939                     "objectUUID": b"0123456789abcde4"})
940         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
941                     "name": b"OU #3",
942                     "x": "y", "y": "a",
943                     "objectUUID": b"0123456789abcde5"})
944         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
945                     "name": b"OU #4",
946                     "x": "y", "y": "a",
947                     "objectUUID": b"0123456789abcde6"})
948         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
949                     "name": b"OU #5",
950                     "x": "y", "y": "a",
951                     "objectUUID": b"0123456789abcde7"})
952         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
953                     "name": b"OU #6",
954                     "x": "y", "y": "a",
955                     "objectUUID": b"0123456789abcde8"})
956         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
957                     "name": b"OU #7",
958                     "x": "y", "y": "a",
959                     "objectUUID": b"0123456789abcde9"})
960         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
961                     "name": b"OU #8",
962                     "x": "y", "y": "a",
963                     "objectUUID": b"0123456789abcde0"})
964         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
965                     "name": b"OU #9",
966                     "x": "y", "y": "a",
967                     "objectUUID": b"0123456789abcdea"})
968         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
969                     "name": b"OU #10",
970                     "x": "y", "y": "a",
971                     "objectUUID": b"0123456789abcdeb"})
972         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
973                     "name": b"OU #10",
974                     "x": "y", "y": "a",
975                     "objectUUID": b"0123456789abcdec"})
976         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
977                     "name": b"OU #10",
978                     "x": "y", "y": "b",
979                     "objectUUID": b"0123456789abcded"})
980         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
981                     "name": b"OU #10",
982                     "x": "x", "y": "b",
983                     "objectUUID": b"0123456789abcdee"})
984         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
985                     "name": b"OU #10",
986                     "x": "x", "y": "b",
987                     "objectUUID": b"0123456789abcd01"})
988         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
989                     "name": b"OU #10",
990                     "x": "x", "y": "b",
991                     "objectUUID": b"0123456789abcd02"})
992         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
993                     "name": b"OU #10",
994                     "x": "x", "y": "b",
995                     "objectUUID": b"0123456789abcd03"})
996         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
997                     "name": b"OU #10",
998                     "x": "x", "y": "b",
999                     "objectUUID": b"0123456789abcd04"})
1000         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
1001                     "name": b"OU #10",
1002                     "x": "x", "y": "b",
1003                     "objectUUID": b"0123456789abcd05"})
1004         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
1005                     "name": b"OU #10",
1006                     "x": "x", "y": "b",
1007                     "objectUUID": b"0123456789abcd06"})
1008         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
1009                     "name": b"OU #10",
1010                     "x": "x", "y": "b",
1011                     "objectUUID": b"0123456789abcd07"})
1012         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
1013                     "name": b"OU #10",
1014                     "x": "x", "y": "c",
1015                     "objectUUID": b"0123456789abcd08"})
1016         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
1017                     "name": b"OU #10",
1018                     "x": "x", "y": "c",
1019                     "objectUUID": b"0123456789abcd09"})
1020
1021     def test_base(self):
1022         """Testing a search"""
1023
1024         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1025                               scope=ldb.SCOPE_BASE)
1026         self.assertEqual(len(res11), 1)
1027
1028     def test_base_lower(self):
1029         """Testing a search"""
1030
1031         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
1032                               scope=ldb.SCOPE_BASE)
1033         self.assertEqual(len(res11), 1)
1034
1035     def test_base_or(self):
1036         """Testing a search"""
1037
1038         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1039                               scope=ldb.SCOPE_BASE,
1040                               expression="(|(ou=ou11)(ou=ou12))")
1041         self.assertEqual(len(res11), 1)
1042
1043     def test_base_or2(self):
1044         """Testing a search"""
1045
1046         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1047                               scope=ldb.SCOPE_BASE,
1048                               expression="(|(x=y)(y=b))")
1049         self.assertEqual(len(res11), 1)
1050
1051     def test_base_and(self):
1052         """Testing a search"""
1053
1054         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1055                               scope=ldb.SCOPE_BASE,
1056                               expression="(&(ou=ou11)(ou=ou12))")
1057         self.assertEqual(len(res11), 0)
1058
1059     def test_base_and2(self):
1060         """Testing a search"""
1061
1062         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1063                               scope=ldb.SCOPE_BASE,
1064                               expression="(&(x=y)(y=a))")
1065         self.assertEqual(len(res11), 1)
1066
1067     def test_base_false(self):
1068         """Testing a search"""
1069
1070         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1071                               scope=ldb.SCOPE_BASE,
1072                               expression="(|(ou=ou13)(ou=ou12))")
1073         self.assertEqual(len(res11), 0)
1074
1075     def test_check_base_false(self):
1076         """Testing a search"""
1077         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1078                               scope=ldb.SCOPE_BASE,
1079                               expression="(|(ou=ou13)(ou=ou12))")
1080         self.assertEqual(len(res11), 0)
1081
1082     def test_check_base_error(self):
1083         """Testing a search"""
1084         checkbaseonsearch = {"dn": "@OPTIONS",
1085                              "checkBaseOnSearch": b"TRUE"}
1086         try:
1087             self.l.add(checkbaseonsearch)
1088         except ldb.LdbError as err:
1089             enum = err.args[0]
1090             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1091             m = ldb.Message.from_dict(self.l,
1092                                       checkbaseonsearch)
1093             self.l.modify(m)
1094
1095         try:
1096             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
1097                                   scope=ldb.SCOPE_BASE,
1098                                   expression="(|(ou=ou13)(ou=ou12))")
1099             self.fail("Should have failed on missing base")
1100         except ldb.LdbError as err:
1101             enum = err.args[0]
1102             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1103
1104     def test_subtree(self):
1105         """Testing a search"""
1106
1107         try:
1108             res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1109                                   scope=ldb.SCOPE_SUBTREE)
1110             if hasattr(self, 'IDXCHECK'):
1111                 self.fail()
1112         except ldb.LdbError as err:
1113             enum = err.args[0]
1114             estr = err.args[1]
1115             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1116             self.assertIn(estr, "ldb FULL SEARCH disabled")
1117         else:
1118             self.assertEqual(len(res11), 25)
1119
1120     def test_subtree2(self):
1121         """Testing a search"""
1122
1123         try:
1124             res11 = self.l.search(base="DC=ORG",
1125                                   scope=ldb.SCOPE_SUBTREE)
1126             if hasattr(self, 'IDXCHECK'):
1127                 self.fail()
1128         except ldb.LdbError as err:
1129             enum = err.args[0]
1130             estr = err.args[1]
1131             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1132             self.assertIn(estr, "ldb FULL SEARCH disabled")
1133         else:
1134             self.assertEqual(len(res11), 36)
1135
1136     def test_subtree_and(self):
1137         """Testing a search"""
1138
1139         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1140                               scope=ldb.SCOPE_SUBTREE,
1141                               expression="(&(ou=ou11)(ou=ou12))")
1142         self.assertEqual(len(res11), 0)
1143
1144     def test_subtree_and2(self):
1145         """Testing a search"""
1146
1147         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1148                               scope=ldb.SCOPE_SUBTREE,
1149                               expression="(&(x=y)(|(y=b)(y=c)))")
1150         self.assertEqual(len(res11), 1)
1151
1152     def test_subtree_and2_lower(self):
1153         """Testing a search"""
1154
1155         res11 = self.l.search(base="DC=samba,DC=org",
1156                               scope=ldb.SCOPE_SUBTREE,
1157                               expression="(&(x=y)(|(y=b)(y=c)))")
1158         self.assertEqual(len(res11), 1)
1159
1160     def test_subtree_or(self):
1161         """Testing a search"""
1162
1163         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1164                               scope=ldb.SCOPE_SUBTREE,
1165                               expression="(|(ou=ou11)(ou=ou12))")
1166         self.assertEqual(len(res11), 2)
1167
1168     def test_subtree_or2(self):
1169         """Testing a search"""
1170
1171         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1172                               scope=ldb.SCOPE_SUBTREE,
1173                               expression="(|(x=y)(y=b))")
1174         self.assertEqual(len(res11), 20)
1175
1176     def test_subtree_or3(self):
1177         """Testing a search"""
1178
1179         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1180                               scope=ldb.SCOPE_SUBTREE,
1181                               expression="(|(x=y)(y=b)(y=c))")
1182         self.assertEqual(len(res11), 22)
1183
1184     def test_one_and(self):
1185         """Testing a search"""
1186
1187         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1188                               scope=ldb.SCOPE_ONELEVEL,
1189                               expression="(&(ou=ou11)(ou=ou12))")
1190         self.assertEqual(len(res11), 0)
1191
1192     def test_one_and2(self):
1193         """Testing a search"""
1194
1195         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1196                               scope=ldb.SCOPE_ONELEVEL,
1197                               expression="(&(x=y)(y=b))")
1198         self.assertEqual(len(res11), 1)
1199
1200     def test_one_or(self):
1201         """Testing a search"""
1202
1203         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1204                               scope=ldb.SCOPE_ONELEVEL,
1205                               expression="(|(ou=ou11)(ou=ou12))")
1206         self.assertEqual(len(res11), 2)
1207
1208     def test_one_or2(self):
1209         """Testing a search"""
1210
1211         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1212                               scope=ldb.SCOPE_ONELEVEL,
1213                               expression="(|(x=y)(y=b))")
1214         self.assertEqual(len(res11), 20)
1215
1216     def test_one_or2_lower(self):
1217         """Testing a search"""
1218
1219         res11 = self.l.search(base="DC=samba,DC=org",
1220                               scope=ldb.SCOPE_ONELEVEL,
1221                               expression="(|(x=y)(y=b))")
1222         self.assertEqual(len(res11), 20)
1223
1224     def test_one_unindexable(self):
1225         """Testing a search"""
1226
1227         try:
1228             res11 = self.l.search(base="DC=samba,DC=org",
1229                                   scope=ldb.SCOPE_ONELEVEL,
1230                                   expression="(y=b*)")
1231             if hasattr(self, 'IDX') and \
1232                not hasattr(self, 'IDXONE') and \
1233                hasattr(self, 'IDXCHECK'):
1234                 self.fail("Should have failed as un-indexed search")
1235
1236             self.assertEqual(len(res11), 9)
1237
1238         except ldb.LdbError as err:
1239             enum = err.args[0]
1240             estr = err.args[1]
1241             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1242             self.assertIn(estr, "ldb FULL SEARCH disabled")
1243
1244     def test_one_unindexable_presence(self):
1245         """Testing a search"""
1246
1247         try:
1248             res11 = self.l.search(base="DC=samba,DC=org",
1249                                   scope=ldb.SCOPE_ONELEVEL,
1250                                   expression="(y=*)")
1251             if hasattr(self, 'IDX') and \
1252                not hasattr(self, 'IDXONE') and \
1253                hasattr(self, 'IDXCHECK'):
1254                 self.fail("Should have failed as un-indexed search")
1255
1256             self.assertEqual(len(res11), 24)
1257
1258         except ldb.LdbError as err:
1259             enum = err.args[0]
1260             estr = err.args[1]
1261             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1262             self.assertIn(estr, "ldb FULL SEARCH disabled")
1263
1264     def test_subtree_and_or(self):
1265         """Testing a search"""
1266
1267         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1268                               scope=ldb.SCOPE_SUBTREE,
1269                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1270         self.assertEqual(len(res11), 0)
1271
1272     def test_subtree_and_or2(self):
1273         """Testing a search"""
1274
1275         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1276                               scope=ldb.SCOPE_SUBTREE,
1277                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1278         self.assertEqual(len(res11), 0)
1279
1280     def test_subtree_and_or3(self):
1281         """Testing a search"""
1282
1283         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1284                               scope=ldb.SCOPE_SUBTREE,
1285                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1286         self.assertEqual(len(res11), 2)
1287
1288     def test_subtree_and_or4(self):
1289         """Testing a search"""
1290
1291         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1292                               scope=ldb.SCOPE_SUBTREE,
1293                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1294         self.assertEqual(len(res11), 2)
1295
1296     def test_subtree_and_or5(self):
1297         """Testing a search"""
1298
1299         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1300                               scope=ldb.SCOPE_SUBTREE,
1301                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1302         self.assertEqual(len(res11), 1)
1303
1304     def test_subtree_or_and(self):
1305         """Testing a search"""
1306
1307         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1308                               scope=ldb.SCOPE_SUBTREE,
1309                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1310         self.assertEqual(len(res11), 10)
1311
1312     def test_subtree_large_and_unique(self):
1313         """Testing a search"""
1314
1315         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1316                               scope=ldb.SCOPE_SUBTREE,
1317                               expression="(&(ou=ou10)(y=a))")
1318         self.assertEqual(len(res11), 1)
1319
1320     def test_subtree_unique(self):
1321         """Testing a search"""
1322
1323         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1324                               scope=ldb.SCOPE_SUBTREE,
1325                               expression="(ou=ou10)")
1326         self.assertEqual(len(res11), 1)
1327
1328     def test_subtree_unique_elsewhere(self):
1329         """Testing a search"""
1330
1331         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1332                               scope=ldb.SCOPE_SUBTREE,
1333                               expression="(ou=ou10)")
1334         self.assertEqual(len(res11), 0)
1335
1336     def test_subtree_unique_elsewhere2(self):
1337         """Testing a search"""
1338
1339         res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1340                               scope=ldb.SCOPE_SUBTREE,
1341                               expression="(ou=unique)")
1342         self.assertEqual(len(res11), 1)
1343
1344     def test_subtree_uni123_elsewhere(self):
1345         """Testing a search, where the search term contains a (normal ASCII)
1346         dotted-i, that will be upper-cased to 'Ä°', U+0130, LATIN
1347         CAPITAL LETTER I WITH DOT ABOVE in certain locales including
1348         tr_TR in which this test is sometimes run.
1349
1350         The search term should fail because the ou does not exist, but
1351         we used to get it wrong in unindexed searches which stopped
1352         comparing at the i, ignoring the rest of the string, which is
1353         not the same as the existing ou ('123' != 'que').
1354         """
1355         res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1356                               scope=ldb.SCOPE_SUBTREE,
1357                               expression="(ou=uni123)")
1358         self.assertEqual(len(res11), 0)
1359
1360     def test_subtree_unique_elsewhere3(self):
1361         """Testing a search"""
1362
1363         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1364                               scope=ldb.SCOPE_SUBTREE,
1365                               expression="(ou=unique)")
1366         self.assertEqual(len(res11), 0)
1367
1368     def test_subtree_unique_elsewhere4(self):
1369         """Testing a search"""
1370
1371         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1372                               scope=ldb.SCOPE_SUBTREE,
1373                               expression="(ou=unique)")
1374         self.assertEqual(len(res11), 0)
1375
1376     def test_subtree_unique_elsewhere5(self):
1377         """Testing a search"""
1378
1379         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1380                               scope=ldb.SCOPE_SUBTREE,
1381                               expression="(ou=unique)")
1382         self.assertEqual(len(res11), 0)
1383
1384     def test_subtree_unique_elsewhere6(self):
1385         """Testing a search"""
1386
1387         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1388                               scope=ldb.SCOPE_SUBTREE,
1389                               expression="(ou=unique)")
1390         self.assertEqual(len(res11), 0)
1391
1392     def test_subtree_unique_elsewhere7(self):
1393         """Testing a search"""
1394
1395         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1396                               scope=ldb.SCOPE_SUBTREE,
1397                               expression="(ou=ou10)")
1398         self.assertEqual(len(res11), 0)
1399
1400     def test_subtree_unique_here(self):
1401         """Testing a search"""
1402
1403         res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1404                               scope=ldb.SCOPE_SUBTREE,
1405                               expression="(ou=unique)")
1406         self.assertEqual(len(res11), 1)
1407
1408     def test_subtree_and_none(self):
1409         """Testing a search"""
1410
1411         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1412                               scope=ldb.SCOPE_SUBTREE,
1413                               expression="(&(ou=ouX)(y=a))")
1414         self.assertEqual(len(res11), 0)
1415
1416     def test_subtree_and_idx_record(self):
1417         """Testing a search against the index record"""
1418
1419         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1420                               scope=ldb.SCOPE_SUBTREE,
1421                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1422         self.assertEqual(len(res11), 0)
1423
1424     def test_subtree_and_idxone_record(self):
1425         """Testing a search against the index record"""
1426
1427         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1428                               scope=ldb.SCOPE_SUBTREE,
1429                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1430         self.assertEqual(len(res11), 0)
1431
1432     def test_onelevel(self):
1433         """Testing a search"""
1434
1435         try:
1436             res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1437                                   scope=ldb.SCOPE_ONELEVEL)
1438             if hasattr(self, 'IDXCHECK') \
1439                and not hasattr(self, 'IDXONE'):
1440                 self.fail()
1441         except ldb.LdbError as err:
1442             enum = err.args[0]
1443             estr = err.args[1]
1444             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1445             self.assertIn(estr, "ldb FULL SEARCH disabled")
1446         else:
1447             self.assertEqual(len(res11), 24)
1448
1449     def test_onelevel2(self):
1450         """Testing a search"""
1451
1452         try:
1453             res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1454                                   scope=ldb.SCOPE_ONELEVEL)
1455             if hasattr(self, 'IDXCHECK') \
1456                and not hasattr(self, 'IDXONE'):
1457                 self.fail()
1458                 self.fail()
1459         except ldb.LdbError as err:
1460             enum = err.args[0]
1461             estr = err.args[1]
1462             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1463             self.assertIn(estr, "ldb FULL SEARCH disabled")
1464         else:
1465             self.assertEqual(len(res11), 9)
1466
1467     def test_onelevel_and_or(self):
1468         """Testing a search"""
1469
1470         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1471                               scope=ldb.SCOPE_ONELEVEL,
1472                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1473         self.assertEqual(len(res11), 0)
1474
1475     def test_onelevel_and_or2(self):
1476         """Testing a search"""
1477
1478         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1479                               scope=ldb.SCOPE_ONELEVEL,
1480                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1481         self.assertEqual(len(res11), 0)
1482
1483     def test_onelevel_and_or3(self):
1484         """Testing a search"""
1485
1486         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1487                               scope=ldb.SCOPE_ONELEVEL,
1488                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1489         self.assertEqual(len(res11), 2)
1490
1491     def test_onelevel_and_or4(self):
1492         """Testing a search"""
1493
1494         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1495                               scope=ldb.SCOPE_ONELEVEL,
1496                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1497         self.assertEqual(len(res11), 2)
1498
1499     def test_onelevel_and_or5(self):
1500         """Testing a search"""
1501
1502         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1503                               scope=ldb.SCOPE_ONELEVEL,
1504                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1505         self.assertEqual(len(res11), 1)
1506
1507     def test_onelevel_or_and(self):
1508         """Testing a search"""
1509
1510         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1511                               scope=ldb.SCOPE_ONELEVEL,
1512                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1513         self.assertEqual(len(res11), 10)
1514
1515     def test_onelevel_large_and_unique(self):
1516         """Testing a search"""
1517
1518         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1519                               scope=ldb.SCOPE_ONELEVEL,
1520                               expression="(&(ou=ou10)(y=a))")
1521         self.assertEqual(len(res11), 1)
1522
1523     def test_onelevel_unique(self):
1524         """Testing a search"""
1525
1526         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1527                               scope=ldb.SCOPE_ONELEVEL,
1528                               expression="(ou=ou10)")
1529         self.assertEqual(len(res11), 1)
1530
1531     def test_onelevel_unique_elsewhere(self):
1532         """Testing a search"""
1533
1534         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1535                               scope=ldb.SCOPE_ONELEVEL,
1536                               expression="(ou=ou10)")
1537         self.assertEqual(len(res11), 0)
1538
1539     def test_onelevel_unique_elsewhere2(self):
1540         """Testing a search (showing that onelevel is not subtree)"""
1541
1542         res11 = self.l.search(base="DC=EXAMPLE,DC=NET",
1543                               scope=ldb.SCOPE_ONELEVEL,
1544                               expression="(ou=unique)")
1545         self.assertEqual(len(res11), 1)
1546
1547     def test_onelevel_unique_elsewhere3(self):
1548         """Testing a search (showing that onelevel is not subtree)"""
1549
1550         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1551                               scope=ldb.SCOPE_ONELEVEL,
1552                               expression="(ou=unique)")
1553         self.assertEqual(len(res11), 0)
1554
1555     def test_onelevel_unique_elsewhere4(self):
1556         """Testing a search (showing that onelevel is not subtree)"""
1557
1558         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1559                               scope=ldb.SCOPE_ONELEVEL,
1560                               expression="(ou=unique)")
1561         self.assertEqual(len(res11), 0)
1562
1563     def test_onelevel_unique_elsewhere5(self):
1564         """Testing a search (showing that onelevel is not subtree)"""
1565
1566         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1567                               scope=ldb.SCOPE_ONELEVEL,
1568                               expression="(ou=unique)")
1569         self.assertEqual(len(res11), 0)
1570
1571     def test_onelevel_unique_elsewhere6(self):
1572         """Testing a search"""
1573
1574         res11 = self.l.search(base="DC=EXAMPLE,DC=COM",
1575                               scope=ldb.SCOPE_ONELEVEL,
1576                               expression="(ou=ou10)")
1577         self.assertEqual(len(res11), 0)
1578
1579     def test_onelevel_unique_here(self):
1580         """Testing a search"""
1581
1582         res11 = self.l.search(base="OU=UNIQUE,DC=EXAMPLE,DC=NET",
1583                               scope=ldb.SCOPE_ONELEVEL,
1584                               expression="(ou=unique)")
1585         self.assertEqual(len(res11), 0)
1586
1587     def test_onelevel_and_none(self):
1588         """Testing a search"""
1589
1590         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1591                               scope=ldb.SCOPE_ONELEVEL,
1592                               expression="(&(ou=ouX)(y=a))")
1593         self.assertEqual(len(res11), 0)
1594
1595     def test_onelevel_and_idx_record(self):
1596         """Testing a search against the index record"""
1597
1598         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1599                               scope=ldb.SCOPE_ONELEVEL,
1600                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1601         self.assertEqual(len(res11), 0)
1602
1603     def test_onelevel_and_idxone_record(self):
1604         """Testing a search against the index record"""
1605
1606         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1607                               scope=ldb.SCOPE_ONELEVEL,
1608                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1609         self.assertEqual(len(res11), 0)
1610
1611     def test_subtree_unindexable(self):
1612         """Testing a search"""
1613
1614         try:
1615             res11 = self.l.search(base="DC=samba,DC=org",
1616                                   scope=ldb.SCOPE_SUBTREE,
1617                                   expression="(y=b*)")
1618             if hasattr(self, 'IDX') and \
1619                hasattr(self, 'IDXCHECK'):
1620                 self.fail("Should have failed as un-indexed search")
1621
1622             self.assertEqual(len(res11), 9)
1623
1624         except ldb.LdbError as err:
1625             enum = err.args[0]
1626             estr = err.args[1]
1627             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1628             self.assertIn(estr, "ldb FULL SEARCH disabled")
1629
1630     def test_onelevel_only_and_or(self):
1631         """Testing a search (showing that onelevel is not subtree)"""
1632
1633         res11 = self.l.search(base="DC=ORG",
1634                               scope=ldb.SCOPE_ONELEVEL,
1635                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1636         self.assertEqual(len(res11), 0)
1637
1638     def test_onelevel_only_and_or2(self):
1639         """Testing a search (showing that onelevel is not subtree)"""
1640
1641         res11 = self.l.search(base="DC=ORG",
1642                               scope=ldb.SCOPE_ONELEVEL,
1643                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1644         self.assertEqual(len(res11), 0)
1645
1646     def test_onelevel_only_and_or3(self):
1647         """Testing a search (showing that onelevel is not subtree)"""
1648
1649         res11 = self.l.search(base="DC=ORG",
1650                               scope=ldb.SCOPE_ONELEVEL,
1651                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1652         self.assertEqual(len(res11), 0)
1653
1654     def test_onelevel_only_and_or4(self):
1655         """Testing a search (showing that onelevel is not subtree)"""
1656
1657         res11 = self.l.search(base="DC=ORG",
1658                               scope=ldb.SCOPE_ONELEVEL,
1659                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1660         self.assertEqual(len(res11), 0)
1661
1662     def test_onelevel_only_and_or5(self):
1663         """Testing a search (showing that onelevel is not subtree)"""
1664
1665         res11 = self.l.search(base="DC=ORG",
1666                               scope=ldb.SCOPE_ONELEVEL,
1667                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1668         self.assertEqual(len(res11), 0)
1669
1670     def test_onelevel_only_or_and(self):
1671         """Testing a search (showing that onelevel is not subtree)"""
1672
1673         res11 = self.l.search(base="DC=ORG",
1674                               scope=ldb.SCOPE_ONELEVEL,
1675                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1676         self.assertEqual(len(res11), 0)
1677
1678     def test_onelevel_only_large_and_unique(self):
1679         """Testing a search (showing that onelevel is not subtree)"""
1680
1681         res11 = self.l.search(base="DC=ORG",
1682                               scope=ldb.SCOPE_ONELEVEL,
1683                               expression="(&(ou=ou10)(y=a))")
1684         self.assertEqual(len(res11), 0)
1685
1686     def test_onelevel_only_unique(self):
1687         """Testing a search (showing that onelevel is not subtree)"""
1688
1689         res11 = self.l.search(base="DC=ORG",
1690                               scope=ldb.SCOPE_ONELEVEL,
1691                               expression="(ou=ou10)")
1692         self.assertEqual(len(res11), 0)
1693
1694     def test_onelevel_only_unique2(self):
1695         """Testing a search"""
1696
1697         res11 = self.l.search(base="DC=ORG",
1698                               scope=ldb.SCOPE_ONELEVEL,
1699                               expression="(ou=unique)")
1700         self.assertEqual(len(res11), 0)
1701
1702     def test_onelevel_only_and_none(self):
1703         """Testing a search (showing that onelevel is not subtree)"""
1704
1705         res11 = self.l.search(base="DC=ORG",
1706                               scope=ldb.SCOPE_ONELEVEL,
1707                               expression="(&(ou=ouX)(y=a))")
1708         self.assertEqual(len(res11), 0)
1709
1710     def test_onelevel_small_and_or(self):
1711         """Testing a search (showing that onelevel is not subtree)"""
1712
1713         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1714                               scope=ldb.SCOPE_ONELEVEL,
1715                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
1716         self.assertEqual(len(res11), 0)
1717
1718     def test_onelevel_small_and_or2(self):
1719         """Testing a search (showing that onelevel is not subtree)"""
1720
1721         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1722                               scope=ldb.SCOPE_ONELEVEL,
1723                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
1724         self.assertEqual(len(res11), 0)
1725
1726     def test_onelevel_small_and_or3(self):
1727         """Testing a search (showing that onelevel is not subtree)"""
1728
1729         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1730                               scope=ldb.SCOPE_ONELEVEL,
1731                               expression="(&(|(ou=ou1)(ou=ou2))(|(x=y)(y=b)(y=c)))")
1732         self.assertEqual(len(res11), 2)
1733
1734     def test_onelevel_small_and_or4(self):
1735         """Testing a search (showing that onelevel is not subtree)"""
1736
1737         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1738                               scope=ldb.SCOPE_ONELEVEL,
1739                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou1)(ou=ou2)))")
1740         self.assertEqual(len(res11), 2)
1741
1742     def test_onelevel_small_and_or5(self):
1743         """Testing a search (showing that onelevel is not subtree)"""
1744
1745         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1746                               scope=ldb.SCOPE_ONELEVEL,
1747                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou1))")
1748         self.assertEqual(len(res11), 1)
1749
1750     def test_onelevel_small_or_and(self):
1751         """Testing a search (showing that onelevel is not subtree)"""
1752
1753         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1754                               scope=ldb.SCOPE_ONELEVEL,
1755                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1756         self.assertEqual(len(res11), 2)
1757
1758     def test_onelevel_small_large_and_unique(self):
1759         """Testing a search (showing that onelevel is not subtree)"""
1760
1761         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1762                               scope=ldb.SCOPE_ONELEVEL,
1763                               expression="(&(ou=ou9)(y=a))")
1764         self.assertEqual(len(res11), 1)
1765
1766     def test_onelevel_small_unique_elsewhere(self):
1767         """Testing a search (showing that onelevel is not subtree)"""
1768
1769         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1770                               scope=ldb.SCOPE_ONELEVEL,
1771                               expression="(ou=ou10)")
1772         self.assertEqual(len(res11), 0)
1773
1774     def test_onelevel_small_and_none(self):
1775         """Testing a search (showing that onelevel is not subtree)"""
1776
1777         res11 = self.l.search(base="DC=EXAMPLE,DC=ORG",
1778                               scope=ldb.SCOPE_ONELEVEL,
1779                               expression="(&(ou=ouX)(y=a))")
1780         self.assertEqual(len(res11), 0)
1781
1782     def test_subtree_unindexable_presence(self):
1783         """Testing a search"""
1784
1785         try:
1786             res11 = self.l.search(base="DC=samba,DC=org",
1787                                   scope=ldb.SCOPE_SUBTREE,
1788                                   expression="(y=*)")
1789             if hasattr(self, 'IDX') and \
1790                hasattr(self, 'IDXCHECK'):
1791                 self.fail("Should have failed as un-indexed search")
1792
1793             self.assertEqual(len(res11), 24)
1794
1795         except ldb.LdbError as err:
1796             enum = err.args[0]
1797             estr = err.args[1]
1798             self.assertEqual(enum, ldb.ERR_INAPPROPRIATE_MATCHING)
1799             self.assertIn(estr, "ldb FULL SEARCH disabled")
1800
1801     def test_dn_filter_one(self):
1802         """Testing that a dn= filter succeeds
1803         (or fails with disallowDNFilter
1804         set and IDXGUID or (IDX and not IDXONE) mode)
1805         when the scope is SCOPE_ONELEVEL.
1806
1807         This should be made more consistent, but for now lock in
1808         the behaviour
1809
1810         """
1811
1812         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1813                               scope=ldb.SCOPE_ONELEVEL,
1814                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1815         if hasattr(self, 'disallowDNFilter') and \
1816            hasattr(self, 'IDX') and \
1817            (hasattr(self, 'IDXGUID') or
1818             ((not hasattr(self, 'IDXONE') and hasattr(self, 'IDX')))):
1819             self.assertEqual(len(res11), 0)
1820         else:
1821             self.assertEqual(len(res11), 1)
1822
1823     def test_dn_filter_subtree(self):
1824         """Testing that a dn= filter succeeds
1825         (or fails with disallowDNFilter set)
1826         when the scope is SCOPE_SUBTREE"""
1827
1828         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1829                               scope=ldb.SCOPE_SUBTREE,
1830                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1831         if hasattr(self, 'disallowDNFilter') \
1832            and hasattr(self, 'IDX'):
1833             self.assertEqual(len(res11), 0)
1834         else:
1835             self.assertEqual(len(res11), 1)
1836
1837     def test_dn_filter_base(self):
1838         """Testing that (incorrectly) a dn= filter works
1839         when the scope is SCOPE_BASE"""
1840
1841         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1842                               scope=ldb.SCOPE_BASE,
1843                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1844
1845         # At some point we should fix this, but it isn't trivial
1846         self.assertEqual(len(res11), 1)
1847
1848     def test_distinguishedName_filter_one(self):
1849         """Testing that a distinguishedName= filter succeeds
1850         when the scope is SCOPE_ONELEVEL.
1851
1852         This should be made more consistent, but for now lock in
1853         the behaviour
1854
1855         """
1856
1857         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1858                               scope=ldb.SCOPE_ONELEVEL,
1859                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1860         self.assertEqual(len(res11), 1)
1861
1862     def test_distinguishedName_filter_subtree(self):
1863         """Testing that a distinguishedName= filter succeeds
1864         when the scope is SCOPE_SUBTREE"""
1865
1866         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1867                               scope=ldb.SCOPE_SUBTREE,
1868                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1869         self.assertEqual(len(res11), 1)
1870
1871     def test_distinguishedName_filter_base(self):
1872         """Testing that (incorrectly) a distinguishedName= filter works
1873         when the scope is SCOPE_BASE"""
1874
1875         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1876                               scope=ldb.SCOPE_BASE,
1877                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DC=ORG)")
1878
1879         # At some point we should fix this, but it isn't trivial
1880         self.assertEqual(len(res11), 1)
1881
1882     def test_bad_dn_filter_base(self):
1883         """Testing that a dn= filter on an invalid DN works
1884         when the scope is SCOPE_BASE but
1885         returns zero results"""
1886
1887         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1888                               scope=ldb.SCOPE_BASE,
1889                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1890
1891         # At some point we should fix this, but it isn't trivial
1892         self.assertEqual(len(res11), 0)
1893
1894
1895     def test_bad_dn_filter_one(self):
1896         """Testing that a dn= filter succeeds but returns zero
1897         results when the DN is not valid on a SCOPE_ONELEVEL search
1898
1899         """
1900
1901         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1902                               scope=ldb.SCOPE_ONELEVEL,
1903                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1904         self.assertEqual(len(res11), 0)
1905
1906     def test_bad_dn_filter_subtree(self):
1907         """Testing that a dn= filter succeeds but returns zero
1908         results when the DN is not valid on a SCOPE_SUBTREE search
1909
1910         """
1911
1912         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1913                               scope=ldb.SCOPE_SUBTREE,
1914                               expression="(dn=OU=OU1,DC=SAMBA,DCXXXX)")
1915         self.assertEqual(len(res11), 0)
1916
1917     def test_bad_distinguishedName_filter_base(self):
1918         """Testing that a distinguishedName= filter on an invalid DN works
1919         when the scope is SCOPE_BASE but
1920         returns zero results"""
1921
1922         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1923                               scope=ldb.SCOPE_BASE,
1924                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1925
1926         # At some point we should fix this, but it isn't trivial
1927         self.assertEqual(len(res11), 0)
1928
1929
1930     def test_bad_distinguishedName_filter_one(self):
1931         """Testing that a distinguishedName= filter succeeds but returns zero
1932         results when the DN is not valid on a SCOPE_ONELEVEL search
1933
1934         """
1935
1936         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1937                               scope=ldb.SCOPE_ONELEVEL,
1938                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1939         self.assertEqual(len(res11), 0)
1940
1941     def test_bad_distinguishedName_filter_subtree(self):
1942         """Testing that a distinguishedName= filter succeeds but returns zero
1943         results when the DN is not valid on a SCOPE_SUBTREE search
1944
1945         """
1946
1947         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1948                               scope=ldb.SCOPE_SUBTREE,
1949                               expression="(distinguishedName=OU=OU1,DC=SAMBA,DCXXXX)")
1950         self.assertEqual(len(res11), 0)
1951
1952     def test_bad_dn_search_base(self):
1953         """Testing with a bad base DN (SCOPE_BASE)"""
1954
1955         try:
1956             res11 = self.l.search(base="OU=OU1,DC=SAMBA,DCXXX",
1957                                   scope=ldb.SCOPE_BASE)
1958             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1959         except ldb.LdbError as err:
1960             enum = err.args[0]
1961             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1962
1963
1964     def test_bad_dn_search_one(self):
1965         """Testing with a bad base DN (SCOPE_ONELEVEL)"""
1966
1967         try:
1968             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1969                                   scope=ldb.SCOPE_ONELEVEL)
1970             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1971         except ldb.LdbError as err:
1972             enum = err.args[0]
1973             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1974
1975     def test_bad_dn_search_subtree(self):
1976         """Testing with a bad base DN (SCOPE_SUBTREE)"""
1977
1978         try:
1979             res11 = self.l.search(base="DC=SAMBA,DCXXXX",
1980                                   scope=ldb.SCOPE_SUBTREE)
1981             self.fail("Should have failed with ERR_INVALID_DN_SYNTAX")
1982         except ldb.LdbError as err:
1983             enum = err.args[0]
1984             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
1985
1986
1987
1988 # Run the search tests against an lmdb backend
1989 class SearchTestsLmdb(SearchTests):
1990
1991     def setUp(self):
1992         if os.environ.get('HAVE_LMDB', '1') == '0':
1993             self.skipTest("No lmdb backend")
1994         self.prefix = MDB_PREFIX
1995         self.index = MDB_INDEX_OBJ
1996         super(SearchTestsLmdb, self).setUp()
1997
1998     def tearDown(self):
1999         super(SearchTestsLmdb, self).tearDown()
2000
2001
2002 class IndexedSearchTests(SearchTests):
2003     """Test searches using the index, to ensure the index doesn't
2004        break things"""
2005
2006     def setUp(self):
2007         super(IndexedSearchTests, self).setUp()
2008         self.l.add({"dn": "@INDEXLIST",
2009                     "@IDXATTR": [b"x", b"y", b"ou"]})
2010         self.IDX = True
2011
2012
2013 class IndexedCheckSearchTests(IndexedSearchTests):
2014     """Test searches using the index, to ensure the index doesn't
2015        break things (full scan disabled)"""
2016
2017     def setUp(self):
2018         self.IDXCHECK = True
2019         super(IndexedCheckSearchTests, self).setUp()
2020
2021
2022 class IndexedSearchDnFilterTests(SearchTests):
2023     """Test searches using the index, to ensure the index doesn't
2024        break things"""
2025
2026     def setUp(self):
2027         super(IndexedSearchDnFilterTests, self).setUp()
2028         self.l.add({"dn": "@OPTIONS",
2029                     "disallowDNFilter": "TRUE"})
2030         self.disallowDNFilter = True
2031
2032         self.l.add({"dn": "@INDEXLIST",
2033                     "@IDXATTR": [b"x", b"y", b"ou"]})
2034         self.IDX = True
2035
2036
2037 class IndexedAndOneLevelSearchTests(SearchTests):
2038     """Test searches using the index including @IDXONE, to ensure
2039        the index doesn't break things"""
2040
2041     def setUp(self):
2042         super(IndexedAndOneLevelSearchTests, self).setUp()
2043         self.l.add({"dn": "@INDEXLIST",
2044                     "@IDXATTR": [b"x", b"y", b"ou"],
2045                     "@IDXONE": [b"1"]})
2046         self.IDX = True
2047         self.IDXONE = True
2048
2049
2050 class IndexedCheckedAndOneLevelSearchTests(IndexedAndOneLevelSearchTests):
2051     """Test searches using the index including @IDXONE, to ensure
2052        the index doesn't break things (full scan disabled)"""
2053
2054     def setUp(self):
2055         self.IDXCHECK = True
2056         super(IndexedCheckedAndOneLevelSearchTests, self).setUp()
2057
2058
2059 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
2060     """Test searches using the index including @IDXONE, to ensure
2061        the index doesn't break things"""
2062
2063     def setUp(self):
2064         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
2065         self.l.add({"dn": "@OPTIONS",
2066                     "disallowDNFilter": "TRUE",
2067                     "checkBaseOnSearch": "TRUE"})
2068         self.disallowDNFilter = True
2069         self.checkBaseOnSearch = True
2070
2071         self.l.add({"dn": "@INDEXLIST",
2072                     "@IDXATTR": [b"x", b"y", b"ou"],
2073                     "@IDXONE": [b"1"]})
2074         self.IDX = True
2075         self.IDXONE = True
2076
2077
2078 class GUIDIndexedSearchTests(SearchTests):
2079     """Test searches using the index, to ensure the index doesn't
2080        break things"""
2081
2082     def setUp(self):
2083         self.index = {"dn": "@INDEXLIST",
2084                       "@IDXATTR": [b"x", b"y", b"ou"],
2085                       "@IDXGUID": [b"objectUUID"],
2086                       "@IDX_DN_GUID": [b"GUID"]}
2087         super(GUIDIndexedSearchTests, self).setUp()
2088
2089         self.IDXGUID = True
2090
2091
2092 class GUIDIndexedDNFilterSearchTests(SearchTests):
2093     """Test searches using the index, to ensure the index doesn't
2094        break things"""
2095
2096     def setUp(self):
2097         self.index = {"dn": "@INDEXLIST",
2098                       "@IDXATTR": [b"x", b"y", b"ou"],
2099                       "@IDXGUID": [b"objectUUID"],
2100                       "@IDX_DN_GUID": [b"GUID"]}
2101         super(GUIDIndexedDNFilterSearchTests, self).setUp()
2102         self.l.add({"dn": "@OPTIONS",
2103                     "disallowDNFilter": "TRUE",
2104                     "checkBaseOnSearch": "TRUE"})
2105         self.disallowDNFilter = True
2106         self.checkBaseOnSearch = True
2107         self.IDX = True
2108         self.IDXGUID = True
2109
2110
2111 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
2112     """Test searches using the index including @IDXONE, to ensure
2113        the index doesn't break things"""
2114
2115     def setUp(self):
2116         self.index = {"dn": "@INDEXLIST",
2117                       "@IDXATTR": [b"x", b"y", b"ou"],
2118                       "@IDXONE": [b"1"],
2119                       "@IDXGUID": [b"objectUUID"],
2120                       "@IDX_DN_GUID": [b"GUID"]}
2121         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
2122         self.l.add({"dn": "@OPTIONS",
2123                     "disallowDNFilter": "TRUE",
2124                     "checkBaseOnSearch": "TRUE"})
2125         self.disallowDNFilter = True
2126         self.checkBaseOnSearch = True
2127         self.IDX = True
2128         self.IDXGUID = True
2129         self.IDXONE = True
2130
2131
2132 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
2133
2134     def setUp(self):
2135         if os.environ.get('HAVE_LMDB', '1') == '0':
2136             self.skipTest("No lmdb backend")
2137         self.prefix = MDB_PREFIX
2138         super(GUIDIndexedSearchTestsLmdb, self).setUp()
2139
2140     def tearDown(self):
2141         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
2142
2143
2144 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
2145
2146     def setUp(self):
2147         if os.environ.get('HAVE_LMDB', '1') == '0':
2148             self.skipTest("No lmdb backend")
2149         self.prefix = MDB_PREFIX
2150         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
2151
2152     def tearDown(self):
2153         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
2154
2155
2156 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
2157
2158     def setUp(self):
2159         if os.environ.get('HAVE_LMDB', '1') == '0':
2160             self.skipTest("No lmdb backend")
2161         self.prefix = MDB_PREFIX
2162         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
2163
2164     def tearDown(self):
2165         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
2166
2167
2168 class AddModifyTests(LdbBaseTest):
2169     def tearDown(self):
2170         shutil.rmtree(self.testdir)
2171         super(AddModifyTests, self).tearDown()
2172
2173         # Ensure the LDB is closed now, so we close the FD
2174         del(self.l)
2175
2176     def setUp(self):
2177         super(AddModifyTests, self).setUp()
2178         self.testdir = tempdir()
2179         self.filename = os.path.join(self.testdir, "add_test.ldb")
2180         self.l = ldb.Ldb(self.url(),
2181                          flags=self.flags(),
2182                          options=["modules:rdn_name"])
2183         try:
2184             self.l.add(self.index)
2185         except AttributeError:
2186             pass
2187
2188         self.l.add({"dn": "DC=SAMBA,DC=ORG",
2189                     "name": b"samba.org",
2190                     "objectUUID": b"0123456789abcdef"})
2191         self.l.add({"dn": "@ATTRIBUTES",
2192                     "objectUUID": "UNIQUE_INDEX"})
2193
2194     def test_add_dup(self):
2195         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2196                     "name": b"Admins",
2197                     "x": "z", "y": "a",
2198                     "objectUUID": b"0123456789abcde1"})
2199         try:
2200             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2201                         "name": b"Admins",
2202                         "x": "z", "y": "a",
2203                         "objectUUID": b"0123456789abcde2"})
2204             self.fail("Should have failed adding duplicate entry")
2205         except ldb.LdbError as err:
2206             enum = err.args[0]
2207             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2208
2209     def test_add_bad(self):
2210         try:
2211             self.l.add({"dn": "BAD,DC=SAMBA,DC=ORG",
2212                         "name": b"Admins",
2213                         "x": "z", "y": "a",
2214                         "objectUUID": b"0123456789abcde1"})
2215             self.fail("Should have failed adding entry with invalid DN")
2216         except ldb.LdbError as err:
2217             enum = err.args[0]
2218             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2219
2220     def test_add_del_add(self):
2221         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2222                     "name": b"Admins",
2223                     "x": "z", "y": "a",
2224                     "objectUUID": b"0123456789abcde1"})
2225         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
2226         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2227                     "name": b"Admins",
2228                     "x": "z", "y": "a",
2229                     "objectUUID": b"0123456789abcde2"})
2230
2231     def test_add_move_add(self):
2232         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2233                     "name": b"Admins",
2234                     "x": "z", "y": "a",
2235                     "objectUUID": b"0123456789abcde1"})
2236         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2237                       "OU=DUP2,DC=SAMBA,DC=ORG")
2238         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2239                     "name": b"Admins",
2240                     "x": "z", "y": "a",
2241                     "objectUUID": b"0123456789abcde2"})
2242
2243     def test_add_move_fail_move_move(self):
2244         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2245                     "name": b"Admins",
2246                     "x": "z", "y": "a",
2247                     "objectUUID": b"0123456789abcde1"})
2248         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2249                     "name": b"Admins",
2250                     "x": "z", "y": "a",
2251                     "objectUUID": b"0123456789abcde2"})
2252
2253         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2254                              scope=ldb.SCOPE_SUBTREE,
2255                              expression="(objectUUID=0123456789abcde1)")
2256         self.assertEqual(len(res2), 1)
2257         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
2258
2259         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2260                              scope=ldb.SCOPE_SUBTREE,
2261                              expression="(objectUUID=0123456789abcde2)")
2262         self.assertEqual(len(res3), 1)
2263         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2264
2265         try:
2266             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2267                           "OU=DUP2,DC=SAMBA,DC=ORG")
2268             self.fail("Should have failed on duplicate DN")
2269         except ldb.LdbError as err:
2270             enum = err.args[0]
2271             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2272
2273         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2274                       "OU=DUP3,DC=SAMBA,DC=ORG")
2275
2276         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2277                       "OU=DUP2,DC=SAMBA,DC=ORG")
2278
2279         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
2280                              scope=ldb.SCOPE_SUBTREE,
2281                              expression="(objectUUID=0123456789abcde1)")
2282         self.assertEqual(len(res2), 1)
2283         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
2284
2285         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
2286                              scope=ldb.SCOPE_SUBTREE,
2287                              expression="(objectUUID=0123456789abcde2)")
2288         self.assertEqual(len(res3), 1)
2289         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
2290
2291     def test_move_missing(self):
2292         try:
2293             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2294                           "OU=DUP2,DC=SAMBA,DC=ORG")
2295             self.fail("Should have failed on missing")
2296         except ldb.LdbError as err:
2297             enum = err.args[0]
2298             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2299
2300     def test_move_missing2(self):
2301         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2302                     "name": b"Admins",
2303                     "x": "z", "y": "a",
2304                     "objectUUID": b"0123456789abcde2"})
2305
2306         try:
2307             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2308                           "OU=DUP2,DC=SAMBA,DC=ORG")
2309             self.fail("Should have failed on missing")
2310         except ldb.LdbError as err:
2311             enum = err.args[0]
2312             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
2313
2314     def test_move_bad(self):
2315         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2316                     "name": b"Admins",
2317                     "x": "z", "y": "a",
2318                     "objectUUID": b"0123456789abcde2"})
2319
2320         try:
2321             self.l.rename("OUXDUP,DC=SAMBA,DC=ORG",
2322                           "OU=DUP2,DC=SAMBA,DC=ORG")
2323             self.fail("Should have failed on invalid DN")
2324         except ldb.LdbError as err:
2325             enum = err.args[0]
2326             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2327
2328     def test_move_bad2(self):
2329         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2330                     "name": b"Admins",
2331                     "x": "z", "y": "a",
2332                     "objectUUID": b"0123456789abcde2"})
2333
2334         try:
2335             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2336                           "OUXDUP2,DC=SAMBA,DC=ORG")
2337             self.fail("Should have failed on missing")
2338         except ldb.LdbError as err:
2339             enum = err.args[0]
2340             self.assertEqual(enum, ldb.ERR_INVALID_DN_SYNTAX)
2341
2342     def test_move_fail_move_add(self):
2343         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2344                     "name": b"Admins",
2345                     "x": "z", "y": "a",
2346                     "objectUUID": b"0123456789abcde1"})
2347         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2348                     "name": b"Admins",
2349                     "x": "z", "y": "a",
2350                     "objectUUID": b"0123456789abcde2"})
2351         try:
2352             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
2353                           "OU=DUP2,DC=SAMBA,DC=ORG")
2354             self.fail("Should have failed on duplicate DN")
2355         except ldb.LdbError as err:
2356             enum = err.args[0]
2357             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2358
2359         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
2360                       "OU=DUP3,DC=SAMBA,DC=ORG")
2361
2362         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2363                     "name": b"Admins",
2364                     "x": "z", "y": "a",
2365                     "objectUUID": b"0123456789abcde3"})
2366
2367
2368 class AddModifyTestsLmdb(AddModifyTests):
2369
2370     def setUp(self):
2371         if os.environ.get('HAVE_LMDB', '1') == '0':
2372             self.skipTest("No lmdb backend")
2373         self.prefix = MDB_PREFIX
2374         self.index = MDB_INDEX_OBJ
2375         super(AddModifyTestsLmdb, self).setUp()
2376
2377     def tearDown(self):
2378         super(AddModifyTestsLmdb, self).tearDown()
2379
2380
2381 class IndexedAddModifyTests(AddModifyTests):
2382     """Test searches using the index, to ensure the index doesn't
2383        break things"""
2384
2385     def setUp(self):
2386         if not hasattr(self, 'index'):
2387             self.index = {"dn": "@INDEXLIST",
2388                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID", b"z"],
2389                           "@IDXONE": [b"1"]}
2390         super(IndexedAddModifyTests, self).setUp()
2391
2392     def test_duplicate_GUID(self):
2393         try:
2394             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
2395                         "name": b"Admins",
2396                         "x": "z", "y": "a",
2397                         "objectUUID": b"0123456789abcdef"})
2398             self.fail("Should have failed adding duplicate GUID")
2399         except ldb.LdbError as err:
2400             enum = err.args[0]
2401             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2402
2403     def test_duplicate_name_dup_GUID(self):
2404         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2405                     "name": b"Admins",
2406                     "x": "z", "y": "a",
2407                     "objectUUID": b"a123456789abcdef"})
2408         try:
2409             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2410                         "name": b"Admins",
2411                         "x": "z", "y": "a",
2412                         "objectUUID": b"a123456789abcdef"})
2413             self.fail("Should have failed adding duplicate GUID")
2414         except ldb.LdbError as err:
2415             enum = err.args[0]
2416             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2417
2418     def test_duplicate_name_dup_GUID2(self):
2419         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2420                     "name": b"Admins",
2421                     "x": "z", "y": "a",
2422                     "objectUUID": b"abc3456789abcdef"})
2423         try:
2424             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
2425                         "name": b"Admins",
2426                         "x": "z", "y": "a",
2427                         "objectUUID": b"aaa3456789abcdef"})
2428             self.fail("Should have failed adding duplicate DN")
2429         except ldb.LdbError as err:
2430             enum = err.args[0]
2431             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
2432
2433         # Checking the GUID didn't stick in the index
2434         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2435                     "name": b"Admins",
2436                     "x": "z", "y": "a",
2437                     "objectUUID": b"aaa3456789abcdef"})
2438
2439     def test_add_dup_guid_add(self):
2440         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
2441                     "name": b"Admins",
2442                     "x": "z", "y": "a",
2443                     "objectUUID": b"0123456789abcde1"})
2444         try:
2445             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2446                         "name": b"Admins",
2447                         "x": "z", "y": "a",
2448                         "objectUUID": b"0123456789abcde1"})
2449             self.fail("Should have failed on duplicate GUID")
2450
2451         except ldb.LdbError as err:
2452             enum = err.args[0]
2453             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2454
2455         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
2456                     "name": b"Admins",
2457                     "x": "z", "y": "a",
2458                     "objectUUID": b"0123456789abcde2"})
2459
2460     def test_duplicate_index_values(self):
2461         self.l.add({"dn": "OU=DIV1,DC=SAMBA,DC=ORG",
2462                     "name": b"Admins",
2463                     "z": "1",
2464                     "objectUUID": b"0123456789abcdff"})
2465         self.l.add({"dn": "OU=DIV2,DC=SAMBA,DC=ORG",
2466                     "name": b"Admins",
2467                     "z": "1",
2468                     "objectUUID": b"0123456789abcdfd"})
2469
2470
2471 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
2472     """Test searches using the index, to ensure the index doesn't
2473        break things"""
2474
2475     def setUp(self):
2476         self.index = {"dn": "@INDEXLIST",
2477                       "@IDXATTR": [b"x", b"y", b"ou"],
2478                       "@IDXONE": [b"1"],
2479                       "@IDXGUID": [b"objectUUID"],
2480                       "@IDX_DN_GUID": [b"GUID"]}
2481         super(GUIDIndexedAddModifyTests, self).setUp()
2482
2483
2484 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
2485     """Test GUID index behaviour insdie the transaction"""
2486
2487     def setUp(self):
2488         super(GUIDTransIndexedAddModifyTests, self).setUp()
2489         self.l.transaction_start()
2490
2491     def tearDown(self):
2492         self.l.transaction_commit()
2493         super(GUIDTransIndexedAddModifyTests, self).tearDown()
2494
2495
2496 class TransIndexedAddModifyTests(IndexedAddModifyTests):
2497     """Test index behaviour insdie the transaction"""
2498
2499     def setUp(self):
2500         super(TransIndexedAddModifyTests, self).setUp()
2501         self.l.transaction_start()
2502
2503     def tearDown(self):
2504         self.l.transaction_commit()
2505         super(TransIndexedAddModifyTests, self).tearDown()
2506
2507
2508 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
2509
2510     def setUp(self):
2511         if os.environ.get('HAVE_LMDB', '1') == '0':
2512             self.skipTest("No lmdb backend")
2513         self.prefix = MDB_PREFIX
2514         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
2515
2516     def tearDown(self):
2517         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
2518
2519
2520 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
2521
2522     def setUp(self):
2523         if os.environ.get('HAVE_LMDB', '1') == '0':
2524             self.skipTest("No lmdb backend")
2525         self.prefix = MDB_PREFIX
2526         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
2527
2528     def tearDown(self):
2529         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
2530
2531
2532 class BadIndexTests(LdbBaseTest):
2533     def setUp(self):
2534         super(BadIndexTests, self).setUp()
2535         self.testdir = tempdir()
2536         self.filename = os.path.join(self.testdir, "test.ldb")
2537         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
2538         if hasattr(self, 'IDXGUID'):
2539             self.ldb.add({"dn": "@INDEXLIST",
2540                           "@IDXATTR": [b"x", b"y", b"ou"],
2541                           "@IDXGUID": [b"objectUUID"],
2542                           "@IDX_DN_GUID": [b"GUID"]})
2543         else:
2544             self.ldb.add({"dn": "@INDEXLIST",
2545                           "@IDXATTR": [b"x", b"y", b"ou"]})
2546
2547         super(BadIndexTests, self).setUp()
2548
2549     def test_unique(self):
2550         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2551                       "objectUUID": b"0123456789abcde1",
2552                       "y": "1"})
2553         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2554                       "objectUUID": b"0123456789abcde2",
2555                       "y": "1"})
2556         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2557                       "objectUUID": b"0123456789abcde3",
2558                       "y": "1"})
2559
2560         res = self.ldb.search(expression="(y=1)",
2561                               base="dc=samba,dc=org")
2562         self.assertEqual(len(res), 3)
2563
2564         # Now set this to unique index, but forget to check the result
2565         try:
2566             self.ldb.add({"dn": "@ATTRIBUTES",
2567                           "y": "UNIQUE_INDEX"})
2568             self.fail()
2569         except ldb.LdbError:
2570             pass
2571
2572         # We must still have a working index
2573         res = self.ldb.search(expression="(y=1)",
2574                               base="dc=samba,dc=org")
2575         self.assertEqual(len(res), 3)
2576
2577     def test_unique_transaction(self):
2578         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2579                       "objectUUID": b"0123456789abcde1",
2580                       "y": "1"})
2581         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2582                       "objectUUID": b"0123456789abcde2",
2583                       "y": "1"})
2584         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2585                       "objectUUID": b"0123456789abcde3",
2586                       "y": "1"})
2587
2588         res = self.ldb.search(expression="(y=1)",
2589                               base="dc=samba,dc=org")
2590         self.assertEqual(len(res), 3)
2591
2592         self.ldb.transaction_start()
2593
2594         # Now set this to unique index, but forget to check the result
2595         try:
2596             self.ldb.add({"dn": "@ATTRIBUTES",
2597                           "y": "UNIQUE_INDEX"})
2598         except ldb.LdbError:
2599             pass
2600
2601         try:
2602             self.ldb.transaction_commit()
2603             self.fail()
2604
2605         except ldb.LdbError as err:
2606             enum = err.args[0]
2607             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2608
2609         # We must still have a working index
2610         res = self.ldb.search(expression="(y=1)",
2611                               base="dc=samba,dc=org")
2612
2613         self.assertEqual(len(res), 3)
2614
2615     def test_casefold(self):
2616         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2617                       "objectUUID": b"0123456789abcde1",
2618                       "y": "a"})
2619         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2620                       "objectUUID": b"0123456789abcde2",
2621                       "y": "A"})
2622         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2623                       "objectUUID": b"0123456789abcde3",
2624                       "y": ["a", "A"]})
2625
2626         res = self.ldb.search(expression="(y=a)",
2627                               base="dc=samba,dc=org")
2628         self.assertEqual(len(res), 2)
2629
2630         self.ldb.add({"dn": "@ATTRIBUTES",
2631                       "y": "CASE_INSENSITIVE"})
2632
2633         # We must still have a working index
2634         res = self.ldb.search(expression="(y=a)",
2635                               base="dc=samba,dc=org")
2636
2637         if hasattr(self, 'IDXGUID'):
2638             self.assertEqual(len(res), 3)
2639         else:
2640             # We should not return this entry twice, but sadly
2641             # we have not yet fixed
2642             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2643             self.assertEqual(len(res), 4)
2644
2645     def test_casefold_transaction(self):
2646         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
2647                       "objectUUID": b"0123456789abcde1",
2648                       "y": "a"})
2649         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2650                       "objectUUID": b"0123456789abcde2",
2651                       "y": "A"})
2652         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
2653                       "objectUUID": b"0123456789abcde3",
2654                       "y": ["a", "A"]})
2655
2656         res = self.ldb.search(expression="(y=a)",
2657                               base="dc=samba,dc=org")
2658         self.assertEqual(len(res), 2)
2659
2660         self.ldb.transaction_start()
2661
2662         self.ldb.add({"dn": "@ATTRIBUTES",
2663                       "y": "CASE_INSENSITIVE"})
2664
2665         self.ldb.transaction_commit()
2666
2667         # We must still have a working index
2668         res = self.ldb.search(expression="(y=a)",
2669                               base="dc=samba,dc=org")
2670
2671         if hasattr(self, 'IDXGUID'):
2672             self.assertEqual(len(res), 3)
2673         else:
2674             # We should not return this entry twice, but sadly
2675             # we have not yet fixed
2676             # https://bugzilla.samba.org/show_bug.cgi?id=13361
2677             self.assertEqual(len(res), 4)
2678
2679     def test_modify_transaction(self):
2680         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2681                       "objectUUID": b"0123456789abcde1",
2682                       "y": "2",
2683                       "z": "2"})
2684
2685         res = self.ldb.search(expression="(y=2)",
2686                               base="dc=samba,dc=org")
2687         self.assertEqual(len(res), 1)
2688
2689         self.ldb.add({"dn": "@ATTRIBUTES",
2690                       "y": "UNIQUE_INDEX"})
2691
2692         self.ldb.transaction_start()
2693
2694         m = ldb.Message()
2695         m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2696         m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2697         m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2698
2699         try:
2700             self.ldb.modify(m)
2701             self.fail()
2702
2703         except ldb.LdbError as err:
2704             enum = err.args[0]
2705             self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2706
2707         try:
2708             self.ldb.transaction_commit()
2709             # We should fail here, but we want to be sure
2710             # we fail below
2711
2712         except ldb.LdbError as err:
2713             enum = err.args[0]
2714             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2715
2716         # The index should still be pointing to x=y
2717         res = self.ldb.search(expression="(y=2)",
2718                               base="dc=samba,dc=org")
2719         self.assertEqual(len(res), 1)
2720
2721         try:
2722             self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
2723                         "objectUUID": b"0123456789abcde2",
2724                         "y": "2"})
2725             self.fail("Added unique attribute twice")
2726         except ldb.LdbError as err:
2727             enum = err.args[0]
2728             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
2729
2730         res = self.ldb.search(expression="(y=2)",
2731                               base="dc=samba,dc=org")
2732         self.assertEqual(len(res), 1)
2733         self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
2734
2735     def tearDown(self):
2736         super(BadIndexTests, self).tearDown()
2737
2738
2739 class GUIDBadIndexTests(BadIndexTests):
2740     """Test Bad index things with GUID index mode"""
2741
2742     def setUp(self):
2743         self.IDXGUID = True
2744
2745         super(GUIDBadIndexTests, self).setUp()
2746
2747
2748 class GUIDBadIndexTestsLmdb(BadIndexTests):
2749
2750     def setUp(self):
2751         if os.environ.get('HAVE_LMDB', '1') == '0':
2752             self.skipTest("No lmdb backend")
2753         self.prefix = MDB_PREFIX
2754         self.index = MDB_INDEX_OBJ
2755         self.IDXGUID = True
2756         super(GUIDBadIndexTestsLmdb, self).setUp()
2757
2758     def tearDown(self):
2759         super(GUIDBadIndexTestsLmdb, self).tearDown()
2760
2761
2762 class BatchModeTests(LdbBaseTest):
2763
2764     def setUp(self):
2765         super(BatchModeTests, self).setUp()
2766         self.testdir = tempdir()
2767         self.filename = os.path.join(self.testdir, "test.ldb")
2768         self.ldb = ldb.Ldb(self.url(),
2769                            flags=self.flags(),
2770                            options=["batch_mode:1"])
2771         if hasattr(self, 'IDXGUID'):
2772             self.ldb.add({"dn": "@INDEXLIST",
2773                           "@IDXATTR": [b"x", b"y", b"ou"],
2774                           "@IDXGUID": [b"objectUUID"],
2775                           "@IDX_DN_GUID": [b"GUID"]})
2776         else:
2777             self.ldb.add({"dn": "@INDEXLIST",
2778                           "@IDXATTR": [b"x", b"y", b"ou"]})
2779
2780     def test_modify_transaction(self):
2781         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
2782                       "objectUUID": b"0123456789abcde1",
2783                       "y": "2",
2784                       "z": "2"})
2785
2786         res = self.ldb.search(expression="(y=2)",
2787                               base="dc=samba,dc=org")
2788         self.assertEqual(len(res), 1)
2789
2790         self.ldb.add({"dn": "@ATTRIBUTES",
2791                       "y": "UNIQUE_INDEX"})
2792
2793         self.ldb.transaction_start()
2794
2795         m = ldb.Message()
2796         m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
2797         m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
2798         m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
2799
2800         try:
2801             self.ldb.modify(m)
2802             self.fail()
2803
2804         except ldb.LdbError as err:
2805             enum = err.args[0]
2806             self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
2807
2808         try:
2809             self.ldb.transaction_commit()
2810             self.fail("Commit should have failed as we were in batch mode")
2811         except ldb.LdbError as err:
2812             enum = err.args[0]
2813             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
2814
2815     def tearDown(self):
2816         super(BatchModeTests, self).tearDown()
2817
2818
2819 class DnTests(TestCase):
2820
2821     def setUp(self):
2822         super(DnTests, self).setUp()
2823         self.ldb = ldb.Ldb()
2824
2825     def tearDown(self):
2826         super(DnTests, self).tearDown()
2827         del(self.ldb)
2828
2829     def test_set_dn_invalid(self):
2830         x = ldb.Message()
2831
2832         def assign():
2833             x.dn = "astring"
2834         self.assertRaises(TypeError, assign)
2835
2836     def test_eq(self):
2837         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2838         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
2839         self.assertEqual(x, y)
2840         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
2841         self.assertNotEqual(x, y)
2842
2843     def test_str(self):
2844         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
2845         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
2846
2847     def test_repr(self):
2848         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
2849         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
2850
2851     def test_get_casefold_2(self):
2852         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
2853         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
2854
2855     def test_validate(self):
2856         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
2857         self.assertTrue(x.validate())
2858
2859     def test_parent(self):
2860         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
2861         self.assertEqual("bar=bloe", x.parent().__str__())
2862
2863     def test_parent_nonexistent(self):
2864         x = ldb.Dn(self.ldb, "@BLA")
2865         self.assertEqual(None, x.parent())
2866
2867     def test_is_valid(self):
2868         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
2869         self.assertTrue(x.is_valid())
2870         x = ldb.Dn(self.ldb, "")
2871         self.assertTrue(x.is_valid())
2872
2873     def test_is_special(self):
2874         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
2875         self.assertFalse(x.is_special())
2876         x = ldb.Dn(self.ldb, "@FOOBAR")
2877         self.assertTrue(x.is_special())
2878
2879     def test_check_special(self):
2880         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
2881         self.assertFalse(x.check_special("FOOBAR"))
2882         x = ldb.Dn(self.ldb, "@FOOBAR")
2883         self.assertTrue(x.check_special("@FOOBAR"))
2884
2885     def test_len(self):
2886         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
2887         self.assertEqual(2, len(x))
2888         x = ldb.Dn(self.ldb, "dc=foo21")
2889         self.assertEqual(1, len(x))
2890
2891     def test_add_child(self):
2892         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2893         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
2894         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2895
2896     def test_add_base(self):
2897         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2898         base = ldb.Dn(self.ldb, "bla=bloe")
2899         self.assertTrue(x.add_base(base))
2900         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2901
2902     def test_add_child_str(self):
2903         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
2904         self.assertTrue(x.add_child("bla=bloe"))
2905         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
2906
2907     def test_add_base_str(self):
2908         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
2909         base = "bla=bloe"
2910         self.assertTrue(x.add_base(base))
2911         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
2912
2913     def test_add(self):
2914         x = ldb.Dn(self.ldb, "dc=foo24")
2915         y = ldb.Dn(self.ldb, "bar=bla")
2916         self.assertEqual("dc=foo24,bar=bla", str(x + y))
2917
2918     def test_remove_base_components(self):
2919         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
2920         x.remove_base_components(len(x) - 1)
2921         self.assertEqual("dc=foo24", str(x))
2922
2923     def test_parse_ldif(self):
2924         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
2925         msg = next(msgs)
2926         self.assertEqual("foo=bar", str(msg[1].dn))
2927         self.assertTrue(isinstance(msg[1], ldb.Message))
2928         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
2929         self.assertEqual("dn: foo=bar\n\n", ldif)
2930
2931     def test_parse_ldif_more(self):
2932         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
2933         msg = next(msgs)
2934         self.assertEqual("foo=bar", str(msg[1].dn))
2935         msg = next(msgs)
2936         self.assertEqual("bar=bar", str(msg[1].dn))
2937
2938     def test_print_ldif(self):
2939         ldif = '''dn: dc=foo27
2940 foo: foo
2941
2942 '''
2943         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2944         self.msg["foo"] = [b"foo"]
2945         self.assertEqual(ldif,
2946                          self.ldb.write_ldif(self.msg,
2947                                              ldb.CHANGETYPE_NONE))
2948
2949     def test_print_ldif_binary(self):
2950         # this also confirms that ldb flags are set even without a URL)
2951         self.ldb = ldb.Ldb(flags=ldb.FLG_SHOW_BINARY)
2952         ldif = '''dn: dc=foo27
2953 foo: f
2954 öö
2955
2956 '''
2957         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2958         self.msg["foo"] = ["f\nöö"]
2959         self.assertEqual(ldif,
2960                          self.ldb.write_ldif(self.msg,
2961                                              ldb.CHANGETYPE_NONE))
2962
2963
2964     def test_print_ldif_no_base64_bad(self):
2965         ldif = '''dn: dc=foo27
2966 foo: f
2967 öö
2968
2969 '''
2970         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2971         self.msg["foo"] = ["f\nöö"]
2972         self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2973         self.assertEqual(ldif,
2974                          self.ldb.write_ldif(self.msg,
2975                                              ldb.CHANGETYPE_NONE))
2976
2977     def test_print_ldif_no_base64_good(self):
2978         ldif = '''dn: dc=foo27
2979 foo: föö
2980
2981 '''
2982         self.msg = ldb.Message(ldb.Dn(self.ldb, "dc=foo27"))
2983         self.msg["foo"] = ["föö"]
2984         self.msg["foo"].set_flags(ldb.FLAG_FORCE_NO_BASE64_LDIF)
2985         self.assertEqual(ldif,
2986                          self.ldb.write_ldif(self.msg,
2987                                              ldb.CHANGETYPE_NONE))
2988
2989     def test_canonical_string(self):
2990         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
2991         self.assertEqual("/bloe/foo25", x.canonical_str())
2992
2993     def test_canonical_ex_string(self):
2994         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
2995         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
2996
2997     def test_ldb_is_child_of(self):
2998         """Testing ldb_dn_compare_dn"""
2999         dn1 = ldb.Dn(self.ldb, "dc=base")
3000         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
3001         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
3002         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
3003
3004         self.assertTrue(dn1.is_child_of(dn1))
3005         self.assertTrue(dn2.is_child_of(dn1))
3006         self.assertTrue(dn4.is_child_of(dn1))
3007         self.assertTrue(dn4.is_child_of(dn3))
3008         self.assertTrue(dn4.is_child_of(dn4))
3009         self.assertFalse(dn3.is_child_of(dn2))
3010         self.assertFalse(dn1.is_child_of(dn4))
3011
3012     def test_ldb_is_child_of_str(self):
3013         """Testing ldb_dn_compare_dn"""
3014         dn1_str = "dc=base"
3015         dn2_str = "cn=foo,dc=base"
3016         dn3_str = "cn=bar,dc=base"
3017         dn4_str = "cn=baz,cn=bar,dc=base"
3018
3019         dn1 = ldb.Dn(self.ldb, dn1_str)
3020         dn2 = ldb.Dn(self.ldb, dn2_str)
3021         dn3 = ldb.Dn(self.ldb, dn3_str)
3022         dn4 = ldb.Dn(self.ldb, dn4_str)
3023
3024         self.assertTrue(dn1.is_child_of(dn1_str))
3025         self.assertTrue(dn2.is_child_of(dn1_str))
3026         self.assertTrue(dn4.is_child_of(dn1_str))
3027         self.assertTrue(dn4.is_child_of(dn3_str))
3028         self.assertTrue(dn4.is_child_of(dn4_str))
3029         self.assertFalse(dn3.is_child_of(dn2_str))
3030         self.assertFalse(dn1.is_child_of(dn4_str))
3031
3032     def test_get_component_name(self):
3033         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3034         self.assertEqual(dn.get_component_name(0), 'cn')
3035         self.assertEqual(dn.get_component_name(1), 'dc')
3036         self.assertEqual(dn.get_component_name(2), None)
3037         self.assertEqual(dn.get_component_name(-1), None)
3038
3039     def test_get_component_value(self):
3040         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3041         self.assertEqual(dn.get_component_value(0), 'foo')
3042         self.assertEqual(dn.get_component_value(1), 'base')
3043         self.assertEqual(dn.get_component_name(2), None)
3044         self.assertEqual(dn.get_component_name(-1), None)
3045
3046     def test_set_component(self):
3047         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3048         dn.set_component(0, 'cn', 'bar')
3049         self.assertEqual(str(dn), "cn=bar,dc=base")
3050         dn.set_component(1, 'o', 'asep')
3051         self.assertEqual(str(dn), "cn=bar,o=asep")
3052         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
3053         self.assertEqual(str(dn), "cn=bar,o=asep")
3054         dn.set_component(1, 'o', 'a,b+c')
3055         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
3056
3057     def test_set_component_bytes(self):
3058         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3059         dn.set_component(0, 'cn', b'bar')
3060         self.assertEqual(str(dn), "cn=bar,dc=base")
3061         dn.set_component(1, 'o', b'asep')
3062         self.assertEqual(str(dn), "cn=bar,o=asep")
3063
3064     def test_set_component_none(self):
3065         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
3066         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
3067
3068     def test_get_extended_component_null(self):
3069         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
3070         self.assertEqual(dn.get_extended_component("TEST"), None)
3071
3072     def test_get_extended_component(self):
3073         self.ldb._register_test_extensions()
3074         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
3075         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
3076
3077     def test_set_extended_component(self):
3078         self.ldb._register_test_extensions()
3079         dn = ldb.Dn(self.ldb, "dc=base")
3080         dn.set_extended_component("TEST", "foo")
3081         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
3082         dn.set_extended_component("TEST", b"bar")
3083         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
3084
3085     def test_extended_str(self):
3086         self.ldb._register_test_extensions()
3087         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
3088         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
3089
3090     def test_get_rdn_name(self):
3091         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3092         self.assertEqual(dn.get_rdn_name(), 'cn')
3093
3094     def test_get_rdn_value(self):
3095         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3096         self.assertEqual(dn.get_rdn_value(), 'foo')
3097
3098     def test_get_casefold(self):
3099         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3100         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
3101
3102     def test_get_linearized(self):
3103         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3104         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
3105
3106     def test_is_null(self):
3107         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
3108         self.assertFalse(dn.is_null())
3109
3110         dn = ldb.Dn(self.ldb, '')
3111         self.assertTrue(dn.is_null())
3112
3113
3114 class LdbMsgTests(TestCase):
3115
3116     def setUp(self):
3117         super(LdbMsgTests, self).setUp()
3118         self.msg = ldb.Message()
3119
3120     def test_init_dn(self):
3121         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
3122         self.assertEqual("dc=foo27", str(self.msg.dn))
3123
3124     def test_iter_items(self):
3125         self.assertEqual(0, len(self.msg.items()))
3126         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
3127         self.assertEqual(1, len(self.msg.items()))
3128
3129     def test_items(self):
3130         self.msg["foo"] = ["foo"]
3131         self.msg["bar"] = ["bar"]
3132         try:
3133             items = self.msg.items()
3134         except:
3135             self.fail()
3136         self.assertEqual([("foo", ldb.MessageElement(["foo"])),
3137                           ("bar", ldb.MessageElement(["bar"]))],
3138                          items)
3139
3140         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=test")
3141         try:
3142             items = self.msg.items()
3143         except:
3144             self.fail()
3145         self.assertEqual([("dn", ldb.Dn(ldb.Ldb(), "dc=test")),
3146                           ("foo", ldb.MessageElement(["foo"])),
3147                           ("bar", ldb.MessageElement(["bar"]))],
3148                          items)
3149
3150     def test_repr(self):
3151         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
3152         self.msg["dc"] = b"foo"
3153         self.assertIn(repr(self.msg), [
3154             "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
3155             "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
3156         ])
3157         self.assertIn(repr(self.msg.text), [
3158             "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
3159             "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
3160         ])
3161
3162     def test_len(self):
3163         self.assertEqual(0, len(self.msg))
3164
3165     def test_notpresent(self):
3166         self.assertRaises(KeyError, lambda: self.msg["foo"])
3167
3168     def test_invalid(self):
3169         try:
3170             self.assertRaises(TypeError, lambda: self.msg[42])
3171         except KeyError:
3172             self.fail()
3173
3174     def test_del(self):
3175         del self.msg["foo"]
3176
3177     def test_add(self):
3178         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
3179
3180     def test_add_text(self):
3181         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
3182
3183     def test_elements_empty(self):
3184         self.assertEqual([], self.msg.elements())
3185
3186     def test_elements(self):
3187         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3188         self.msg.add(el)
3189         self.assertEqual([el], self.msg.elements())
3190         self.assertEqual([el.text], self.msg.text.elements())
3191
3192     def test_add_value(self):
3193         self.assertEqual(0, len(self.msg))
3194         self.msg["foo"] = [b"foo"]
3195         self.assertEqual(1, len(self.msg))
3196
3197     def test_add_value_text(self):
3198         self.assertEqual(0, len(self.msg))
3199         self.msg["foo"] = ["foo"]
3200         self.assertEqual(1, len(self.msg))
3201
3202     def test_add_value_multiple(self):
3203         self.assertEqual(0, len(self.msg))
3204         self.msg["foo"] = [b"foo", b"bla"]
3205         self.assertEqual(1, len(self.msg))
3206         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
3207
3208     def test_add_value_multiple_text(self):
3209         self.assertEqual(0, len(self.msg))
3210         self.msg["foo"] = ["foo", "bla"]
3211         self.assertEqual(1, len(self.msg))
3212         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
3213
3214     def test_set_value(self):
3215         self.msg["foo"] = [b"fool"]
3216         self.assertEqual([b"fool"], list(self.msg["foo"]))
3217         self.msg["foo"] = [b"bar"]
3218         self.assertEqual([b"bar"], list(self.msg["foo"]))
3219
3220     def test_set_value_text(self):
3221         self.msg["foo"] = ["fool"]
3222         self.assertEqual(["fool"], list(self.msg.text["foo"]))
3223         self.msg["foo"] = ["bar"]
3224         self.assertEqual(["bar"], list(self.msg.text["foo"]))
3225
3226     def test_keys(self):
3227         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3228         self.msg["foo"] = [b"bla"]
3229         self.msg["bar"] = [b"bla"]
3230         self.assertEqual(["dn", "foo", "bar"], list(self.msg.keys()))
3231
3232     def test_keys_text(self):
3233         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3234         self.msg["foo"] = ["bla"]
3235         self.msg["bar"] = ["bla"]
3236         self.assertEqual(["dn", "foo", "bar"], list(self.msg.text.keys()))
3237
3238     def test_dn(self):
3239         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3240         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
3241
3242     def test_get_dn(self):
3243         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3244         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
3245
3246     def test_dn_text(self):
3247         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3248         self.assertEqual("@BASEINFO", str(self.msg.dn))
3249         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
3250
3251     def test_get_dn_text(self):
3252         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3253         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
3254         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
3255
3256     def test_get_invalid(self):
3257         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
3258         self.assertRaises(TypeError, self.msg.get, 42)
3259
3260     def test_get_other(self):
3261         self.msg["foo"] = [b"bar"]
3262         self.assertEqual(b"bar", self.msg.get("foo")[0])
3263         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
3264         self.assertEqual(None, self.msg.get("foo", idx=1))
3265         self.assertEqual("", self.msg.get("foo", default='', idx=1))
3266
3267     def test_get_other_text(self):
3268         self.msg["foo"] = ["bar"]
3269         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
3270         self.assertEqual("bar", self.msg.text.get("foo")[0])
3271         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
3272         self.assertEqual(None, self.msg.get("foo", idx=1))
3273         self.assertEqual("", self.msg.get("foo", default='', idx=1))
3274
3275     def test_get_default(self):
3276         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
3277         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
3278
3279     def test_get_default_text(self):
3280         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
3281         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
3282
3283     def test_get_unknown(self):
3284         self.assertEqual(None, self.msg.get("lalalala"))
3285
3286     def test_get_unknown_text(self):
3287         self.assertEqual(None, self.msg.text.get("lalalala"))
3288
3289     def test_contains(self):
3290         self.msg['foo'] = ['bar']
3291         self.assertIn('foo', self.msg)
3292
3293         self.msg['Foo'] = ['bar']
3294         self.assertIn('Foo', self.msg)
3295
3296     def test_contains_case(self):
3297         self.msg['foo'] = ['bar']
3298         self.assertIn('Foo', self.msg)
3299
3300         self.msg['Foo'] = ['bar']
3301         self.assertIn('foo', self.msg)
3302
3303     def test_contains_dn(self):
3304         self.assertIn('dn', self.msg)
3305
3306     def test_contains_dn_case(self):
3307         self.assertIn('DN', self.msg)
3308
3309     def test_contains_invalid(self):
3310         self.assertRaises(TypeError, lambda: None in self.msg)
3311
3312     def test_msg_diff(self):
3313         l = ldb.Ldb()
3314         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
3315         msg1 = next(msgs)[1]
3316         msg2 = next(msgs)[1]
3317         msgdiff = l.msg_diff(msg1, msg2)
3318         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
3319         self.assertRaises(KeyError, lambda: msgdiff["foo"])
3320         self.assertEqual(1, len(msgdiff))
3321
3322     def test_equal_empty(self):
3323         msg1 = ldb.Message()
3324         msg2 = ldb.Message()
3325         self.assertEqual(msg1, msg2)
3326
3327     def test_equal_simplel(self):
3328         db = ldb.Ldb()
3329         msg1 = ldb.Message()
3330         msg1.dn = ldb.Dn(db, "foo=bar")
3331         msg2 = ldb.Message()
3332         msg2.dn = ldb.Dn(db, "foo=bar")
3333         self.assertEqual(msg1, msg2)
3334         msg1['foo'] = b'bar'
3335         msg2['foo'] = b'bar'
3336         self.assertEqual(msg1, msg2)
3337         msg2['foo'] = b'blie'
3338         self.assertNotEqual(msg1, msg2)
3339         msg2['foo'] = b'blie'
3340
3341     def test_from_dict(self):
3342         rec = {"dn": "dc=fromdict",
3343                "a1": [b"a1-val1", b"a1-val1"]}
3344         l = ldb.Ldb()
3345         # check different types of input Flags
3346         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3347             m = ldb.Message.from_dict(l, rec, flags)
3348             self.assertEqual(rec["a1"], list(m["a1"]))
3349             self.assertEqual(flags, m["a1"].flags())
3350         # check input params
3351         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3352         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3353         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3354         # Message.from_dict expects dictionary with 'dn'
3355         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
3356         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3357
3358     def test_from_dict_text(self):
3359         rec = {"dn": "dc=fromdict",
3360                "a1": ["a1-val1", "a1-val1"]}
3361         l = ldb.Ldb()
3362         # check different types of input Flags
3363         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
3364             m = ldb.Message.from_dict(l, rec, flags)
3365             self.assertEqual(rec["a1"], list(m.text["a1"]))
3366             self.assertEqual(flags, m.text["a1"].flags())
3367         # check input params
3368         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
3369         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
3370         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
3371         # Message.from_dict expects dictionary with 'dn'
3372         err_rec = {"a1": ["a1-val1", "a1-val1"]}
3373         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
3374
3375     def test_copy_add_message_element(self):
3376         m = ldb.Message()
3377         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
3378         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
3379         mto = ldb.Message()
3380         mto["1"] = m["1"]
3381         mto["2"] = m["2"]
3382         self.assertEqual(mto["1"], m["1"])
3383         self.assertEqual(mto["2"], m["2"])
3384         mto = ldb.Message()
3385         mto.add(m["1"])
3386         mto.add(m["2"])
3387         self.assertEqual(mto["1"], m["1"])
3388         self.assertEqual(mto["2"], m["2"])
3389
3390     def test_copy_add_message_element_text(self):
3391         m = ldb.Message()
3392         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
3393         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
3394         mto = ldb.Message()
3395         mto["1"] = m["1"]
3396         mto["2"] = m["2"]
3397         self.assertEqual(mto["1"], m.text["1"])
3398         self.assertEqual(mto["2"], m.text["2"])
3399         mto = ldb.Message()
3400         mto.add(m["1"])
3401         mto.add(m["2"])
3402         self.assertEqual(mto.text["1"], m.text["1"])
3403         self.assertEqual(mto.text["2"], m.text["2"])
3404         self.assertEqual(mto["1"], m["1"])
3405         self.assertEqual(mto["2"], m["2"])
3406
3407
3408 class MessageElementTests(TestCase):
3409
3410     def test_cmp_element(self):
3411         x = ldb.MessageElement([b"foo"])
3412         y = ldb.MessageElement([b"foo"])
3413         z = ldb.MessageElement([b"bzr"])
3414         self.assertEqual(x, y)
3415         self.assertNotEqual(x, z)
3416
3417     def test_cmp_element_text(self):
3418         x = ldb.MessageElement([b"foo"])
3419         y = ldb.MessageElement(["foo"])
3420         self.assertEqual(x, y)
3421
3422     def test_create_iterable(self):
3423         x = ldb.MessageElement([b"foo"])
3424         self.assertEqual([b"foo"], list(x))
3425         self.assertEqual(["foo"], list(x.text))
3426
3427     def test_repr(self):
3428         x = ldb.MessageElement([b"foo"])
3429         self.assertEqual("MessageElement([b'foo'])", repr(x))
3430         self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
3431         x = ldb.MessageElement([b"foo", b"bla"])
3432         self.assertEqual(2, len(x))
3433         self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
3434         self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
3435
3436     def test_get_item(self):
3437         x = ldb.MessageElement([b"foo", b"bar"])
3438         self.assertEqual(b"foo", x[0])
3439         self.assertEqual(b"bar", x[1])
3440         self.assertEqual(b"bar", x[-1])
3441         self.assertRaises(IndexError, lambda: x[45])
3442
3443     def test_get_item_text(self):
3444         x = ldb.MessageElement(["foo", "bar"])
3445         self.assertEqual("foo", x.text[0])
3446         self.assertEqual("bar", x.text[1])
3447         self.assertEqual("bar", x.text[-1])
3448         self.assertRaises(IndexError, lambda: x[45])
3449
3450     def test_len(self):
3451         x = ldb.MessageElement([b"foo", b"bar"])
3452         self.assertEqual(2, len(x))
3453
3454     def test_eq(self):
3455         x = ldb.MessageElement([b"foo", b"bar"])
3456         y = ldb.MessageElement([b"foo", b"bar"])
3457         self.assertEqual(y, x)
3458         x = ldb.MessageElement([b"foo"])
3459         self.assertNotEqual(y, x)
3460         y = ldb.MessageElement([b"foo"])
3461         self.assertEqual(y, x)
3462
3463     def test_extended(self):
3464         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
3465         self.assertEqual("MessageElement([b'456'])", repr(el))
3466         self.assertEqual("MessageElement([b'456']).text", repr(el.text))
3467
3468     def test_bad_text(self):
3469         el = ldb.MessageElement(b'\xba\xdd')
3470         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
3471
3472
3473 class LdbResultTests(LdbBaseTest):
3474
3475     def setUp(self):
3476         super(LdbResultTests, self).setUp()
3477         self.testdir = tempdir()
3478         self.filename = os.path.join(self.testdir, "test.ldb")
3479         self.l = ldb.Ldb(self.url(), flags=self.flags())
3480         try:
3481             self.l.add(self.index)
3482         except AttributeError:
3483             pass
3484         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
3485                     "objectUUID": b"0123456789abcde0"})
3486         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
3487                     "objectUUID": b"0123456789abcde1"})
3488         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
3489                     "objectUUID": b"0123456789abcde2"})
3490         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
3491                     "objectUUID": b"0123456789abcde3"})
3492         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
3493                     "objectUUID": b"0123456789abcde4"})
3494         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
3495                     "objectUUID": b"0123456789abcde5"})
3496         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
3497                     "objectUUID": b"0123456789abcde6"})
3498         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
3499                     "objectUUID": b"0123456789abcde7"})
3500         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
3501                     "objectUUID": b"0123456789abcde8"})
3502         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
3503                     "objectUUID": b"0123456789abcde9"})
3504         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
3505                     "objectUUID": b"0123456789abcdea"})
3506         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
3507                     "objectUUID": b"0123456789abcdeb"})
3508         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
3509                     "objectUUID": b"0123456789abcdec"})
3510
3511     def tearDown(self):
3512         shutil.rmtree(self.testdir)
3513         super(LdbResultTests, self).tearDown()
3514         # Ensure the LDB is closed now, so we close the FD
3515         del(self.l)
3516
3517     def test_return_type(self):
3518         res = self.l.search()
3519         self.assertEqual(str(res), "<ldb result>")
3520
3521     def test_get_msgs(self):
3522         res = self.l.search()
3523         list = res.msgs
3524
3525     def test_get_controls(self):
3526         res = self.l.search()
3527         list = res.controls
3528
3529     def test_get_referals(self):
3530         res = self.l.search()
3531         list = res.referals
3532
3533     def test_iter_msgs(self):
3534         found = False
3535         for l in self.l.search().msgs:
3536             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3537                 found = True
3538         self.assertTrue(found)
3539
3540     def test_iter_msgs_count(self):
3541         self.assertTrue(self.l.search().count > 0)
3542         # 13 objects has been added to the DC=SAMBA, DC=ORG
3543         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
3544
3545     def test_iter_controls(self):
3546         res = self.l.search().controls
3547         it = iter(res)
3548
3549     def test_create_control(self):
3550         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
3551         c = ldb.Control(self.l, "relax:1")
3552         self.assertEqual(c.critical, True)
3553         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
3554
3555     def test_iter_refs(self):
3556         res = self.l.search().referals
3557         it = iter(res)
3558
3559     def test_search_sequence_msgs(self):
3560         found = False
3561         res = self.l.search().msgs
3562
3563         for i in range(0, len(res)):
3564             l = res[i]
3565             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3566                 found = True
3567         self.assertTrue(found)
3568
3569     def test_search_as_iter(self):
3570         found = False
3571         res = self.l.search()
3572
3573         for l in res:
3574             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3575                 found = True
3576         self.assertTrue(found)
3577
3578     def test_search_iter(self):
3579         found = False
3580         res = self.l.search_iterator()
3581
3582         for l in res:
3583             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3584                 found = True
3585         self.assertTrue(found)
3586
3587     # Show that search results can't see into a transaction
3588
3589     def test_search_against_trans(self):
3590         found11 = False
3591
3592         (r1, w1) = os.pipe()
3593
3594         (r2, w2) = os.pipe()
3595
3596         # For the first element, fork a child that will
3597         # write to the DB
3598         pid = os.fork()
3599         if pid == 0:
3600             # In the child, re-open
3601             del(self.l)
3602             gc.collect()
3603
3604             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3605             # start a transaction
3606             child_ldb.transaction_start()
3607
3608             # write to it
3609             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3610                            "name": b"samba.org",
3611                            "objectUUID": b"o123456789acbdef"})
3612
3613             os.write(w1, b"added")
3614
3615             # Now wait for the search to be done
3616             os.read(r2, 6)
3617
3618             # and commit
3619             try:
3620                 child_ldb.transaction_commit()
3621             except ldb.LdbError as err:
3622                 # We print this here to see what went wrong in the child
3623                 print(err)
3624                 os._exit(1)
3625
3626             os.write(w1, b"transaction")
3627             os._exit(0)
3628
3629         self.assertEqual(os.read(r1, 5), b"added")
3630
3631         # This should not turn up until the transaction is concluded
3632         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3633                               scope=ldb.SCOPE_BASE)
3634         self.assertEqual(len(res11), 0)
3635
3636         os.write(w2, b"search")
3637
3638         # Now wait for the transaction to be done.  This should
3639         # deadlock, but the search doesn't hold a read lock for the
3640         # iterator lifetime currently.
3641         self.assertEqual(os.read(r1, 11), b"transaction")
3642
3643         # This should now turn up, as the transaction is over
3644         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3645                               scope=ldb.SCOPE_BASE)
3646         self.assertEqual(len(res11), 1)
3647
3648         self.assertFalse(found11)
3649
3650         (got_pid, status) = os.waitpid(pid, 0)
3651         self.assertEqual(got_pid, pid)
3652
3653     def test_search_iter_against_trans(self):
3654         found = False
3655         found11 = False
3656
3657         # We need to hold this iterator open to hold the all-record
3658         # lock
3659         res = self.l.search_iterator()
3660
3661         (r1, w1) = os.pipe()
3662
3663         (r2, w2) = os.pipe()
3664
3665         # For the first element, with the sequence open (which
3666         # means with ldb locks held), fork a child that will
3667         # write to the DB
3668         pid = os.fork()
3669         if pid == 0:
3670             # In the child, re-open
3671             del(res)
3672             del(self.l)
3673             gc.collect()
3674
3675             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
3676             # start a transaction
3677             child_ldb.transaction_start()
3678
3679             # write to it
3680             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
3681                            "name": b"samba.org",
3682                            "objectUUID": b"o123456789acbdef"})
3683
3684             os.write(w1, b"added")
3685
3686             # Now wait for the search to be done
3687             os.read(r2, 6)
3688
3689             # and commit
3690             try:
3691                 child_ldb.transaction_commit()
3692             except ldb.LdbError as err:
3693                 # We print this here to see what went wrong in the child
3694                 print(err)
3695                 os._exit(1)
3696
3697             os.write(w1, b"transaction")
3698             os._exit(0)
3699
3700         self.assertEqual(os.read(r1, 5), b"added")
3701
3702         # This should not turn up until the transaction is concluded
3703         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3704                               scope=ldb.SCOPE_BASE)
3705         self.assertEqual(len(res11), 0)
3706
3707         os.write(w2, b"search")
3708
3709         # allow the transaction to start
3710         time.sleep(1)
3711
3712         # This should not turn up until the search finishes and
3713         # removed the read lock, but for ldb_tdb that happened as soon
3714         # as we called the first res.next()
3715         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3716                               scope=ldb.SCOPE_BASE)
3717         self.assertEqual(len(res11), 0)
3718
3719         # These results are all collected at the first next(res) call
3720         for l in res:
3721             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
3722                 found = True
3723             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
3724                 found11 = True
3725
3726         # Now wait for the transaction to be done.
3727         self.assertEqual(os.read(r1, 11), b"transaction")
3728
3729         # This should now turn up, as the transaction is over and all
3730         # read locks are gone
3731         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
3732                               scope=ldb.SCOPE_BASE)
3733         self.assertEqual(len(res11), 1)
3734
3735         self.assertTrue(found)
3736         self.assertFalse(found11)
3737
3738         (got_pid, status) = os.waitpid(pid, 0)
3739         self.assertEqual(got_pid, pid)
3740
3741
3742 class LdbResultTestsLmdb(LdbResultTests):
3743
3744     def setUp(self):
3745         if os.environ.get('HAVE_LMDB', '1') == '0':
3746             self.skipTest("No lmdb backend")
3747         self.prefix = MDB_PREFIX
3748         self.index = MDB_INDEX_OBJ
3749         super(LdbResultTestsLmdb, self).setUp()
3750
3751     def tearDown(self):
3752         super(LdbResultTestsLmdb, self).tearDown()
3753
3754
3755 class BadTypeTests(TestCase):
3756     def test_control(self):
3757         l = ldb.Ldb()
3758         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
3759         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
3760
3761     def test_modify(self):
3762         l = ldb.Ldb()
3763         dn = ldb.Dn(l, 'a=b')
3764         m = ldb.Message(dn)
3765         self.assertRaises(TypeError, l.modify, '<bad type>')
3766         self.assertRaises(TypeError, l.modify, m, '<bad type>')
3767
3768     def test_add(self):
3769         l = ldb.Ldb()
3770         dn = ldb.Dn(l, 'a=b')
3771         m = ldb.Message(dn)
3772         self.assertRaises(TypeError, l.add, '<bad type>')
3773         self.assertRaises(TypeError, l.add, m, '<bad type>')
3774
3775     def test_delete(self):
3776         l = ldb.Ldb()
3777         dn = ldb.Dn(l, 'a=b')
3778         self.assertRaises(TypeError, l.add, '<bad type>')
3779         self.assertRaises(TypeError, l.add, dn, '<bad type>')
3780
3781     def test_rename(self):
3782         l = ldb.Ldb()
3783         dn = ldb.Dn(l, 'a=b')
3784         self.assertRaises(TypeError, l.add, '<bad type>', dn)
3785         self.assertRaises(TypeError, l.add, dn, '<bad type>')
3786         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
3787
3788     def test_search(self):
3789         l = ldb.Ldb()
3790         self.assertRaises(TypeError, l.search, base=1234)
3791         self.assertRaises(TypeError, l.search, scope='<bad type>')
3792         self.assertRaises(TypeError, l.search, expression=1234)
3793         self.assertRaises(TypeError, l.search, attrs='<bad type>')
3794         self.assertRaises(TypeError, l.search, controls='<bad type>')
3795
3796
3797 class VersionTests(TestCase):
3798
3799     def test_version(self):
3800         self.assertTrue(isinstance(ldb.__version__, str))
3801
3802 class NestedTransactionTests(LdbBaseTest):
3803     def setUp(self):
3804         super(NestedTransactionTests, self).setUp()
3805         self.testdir = tempdir()
3806         self.filename = os.path.join(self.testdir, "test.ldb")
3807         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
3808         self.ldb.add({"dn": "@INDEXLIST",
3809                       "@IDXATTR": [b"x", b"y", b"ou"],
3810                       "@IDXGUID": [b"objectUUID"],
3811                       "@IDX_DN_GUID": [b"GUID"]})
3812
3813         super(NestedTransactionTests, self).setUp()
3814
3815     #
3816     # This test documents that currently ldb does not support true nested
3817     # transactions.
3818     #
3819     # Note: The test is written so that it treats failure as pass.
3820     #       It is done this way as standalone ldb builds do not use the samba
3821     #       known fail mechanism
3822     #
3823     def test_nested_transactions(self):
3824
3825         self.ldb.transaction_start()
3826
3827         self.ldb.add({"dn": "x=x1,dc=samba,dc=org",
3828                       "objectUUID": b"0123456789abcde1"})
3829         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3830                               base="dc=samba,dc=org")
3831         self.assertEqual(len(res), 1)
3832
3833         self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
3834                       "objectUUID": b"0123456789abcde2"})
3835         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3836                               base="dc=samba,dc=org")
3837         self.assertEqual(len(res), 1)
3838
3839         self.ldb.transaction_start()
3840         self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
3841                       "objectUUID": b"0123456789abcde3"})
3842         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3843                               base="dc=samba,dc=org")
3844         self.assertEqual(len(res), 1)
3845         self.ldb.transaction_cancel()
3846         #
3847         # Check that we can not see the record added by the cancelled
3848         # transaction.
3849         # Currently this fails as ldb does not support true nested
3850         # transactions, and only the outer commits and cancels have an effect
3851         #
3852         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3853                               base="dc=samba,dc=org")
3854         #
3855         # FIXME this test currently passes on a failure, i.e. if nested
3856         #       transaction support worked correctly the correct test would
3857         #       be.
3858         #         self.assertEqual(len(res), 0)
3859         #       as the add of objectUUID=0123456789abcde3 would reverted when
3860         #       the sub transaction it was nested in was rolled back.
3861         #
3862         #       Currently this is not the case so the record is still present.
3863         self.assertEqual(len(res), 1)
3864
3865
3866         # Commit the outer transaction
3867         #
3868         self.ldb.transaction_commit()
3869         #
3870         # Now check we can still see the records added in the outer
3871         # transaction.
3872         #
3873         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
3874                               base="dc=samba,dc=org")
3875         self.assertEqual(len(res), 1)
3876         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
3877                               base="dc=samba,dc=org")
3878         self.assertEqual(len(res), 1)
3879         #
3880         # And that we can't see the records added by the nested transaction.
3881         #
3882         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
3883                               base="dc=samba,dc=org")
3884         # FIXME again if nested transactions worked correctly we would not
3885         #       see this record. The test should be.
3886         #         self.assertEqual(len(res), 0)
3887         self.assertEqual(len(res), 1)
3888
3889     def tearDown(self):
3890         super(NestedTransactionTests, self).tearDown()
3891
3892
3893 class LmdbNestedTransactionTests(NestedTransactionTests):
3894
3895     def setUp(self):
3896         if os.environ.get('HAVE_LMDB', '1') == '0':
3897             self.skipTest("No lmdb backend")
3898         self.prefix = MDB_PREFIX
3899         self.index = MDB_INDEX_OBJ
3900         super(LmdbNestedTransactionTests, self).setUp()
3901
3902     def tearDown(self):
3903         super(LmdbNestedTransactionTests, self).tearDown()
3904
3905
3906 if __name__ == '__main__':
3907     import unittest
3908     unittest.TestProgram()