PY3: change shebang to python3 in lib dir
[amitay/samba.git] / lib / ldb / tests / python / index.py
1 #!/usr/bin/env python3
2 #
3 # Tests for truncated index keys
4 #
5 #   Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 #
20 """Tests for truncated index keys
21
22 Databases such as lmdb have a maximum key length, these tests ensure that
23 ldb behaves correctly in those circumstances.
24
25 """
26
27 import os
28 from unittest import TestCase
29 import sys
30 import ldb
31 import shutil
32
33 PY3 = sys.version_info > (3, 0)
34
35 TDB_PREFIX = "tdb://"
36 MDB_PREFIX = "mdb://"
37
38
39 def tempdir():
40     import tempfile
41     try:
42         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
43     except KeyError:
44         dir_prefix = None
45     return tempfile.mkdtemp(dir=dir_prefix)
46
47
48 def contains(result, dn):
49     if result is None:
50         return False
51
52     for r in result:
53         if str(r["dn"]) == dn:
54             return True
55     return False
56
57
58 class LdbBaseTest(TestCase):
59     def setUp(self):
60         super(LdbBaseTest, self).setUp()
61         try:
62             if self.prefix is None:
63                 self.prefix = TDB_PREFIX
64         except AttributeError:
65             self.prefix = TDB_PREFIX
66
67     def tearDown(self):
68         super(LdbBaseTest, self).tearDown()
69
70     def url(self):
71         return self.prefix + self.filename
72
73     def flags(self):
74         if self.prefix == MDB_PREFIX:
75             return ldb.FLG_NOSYNC
76         else:
77             return 0
78
79
80 class MaxIndexKeyLengthTests(LdbBaseTest):
81     def checkGuids(self, key, guids):
82         #
83         # This check relies on the current implementation where the indexes
84         # are in the same database as the data.
85         #
86         # It checks that the index record exists, unless guids is None then
87         # the record must not exist. And the it contains the expected guid
88         # entries.
89         #
90         # The caller needs to provide the GUID's in the expected order
91         #
92         res = self.l.search(
93             base=key,
94             scope=ldb.SCOPE_BASE)
95         if guids is None:
96             self.assertEqual(len(res), 0)
97             return
98         self.assertEqual(len(res), 1)
99
100         # The GUID index format has only one value
101         index = res[0]["@IDX"][0]
102         self.assertEqual(len(guids), len(index))
103         self.assertEqual(guids, index)
104
105     def tearDown(self):
106         shutil.rmtree(self.testdir)
107         super(MaxIndexKeyLengthTests, self).tearDown()
108
109         # Ensure the LDB is closed now, so we close the FD
110         del(self.l)
111
112     def setUp(self):
113         super(MaxIndexKeyLengthTests, self).setUp()
114         self.testdir = tempdir()
115         self.filename = os.path.join(self.testdir, "key_len_test.ldb")
116         # Note that the maximum key length is set to 54
117         # This accounts for the 4 bytes added by the dn formatting
118         # a leading dn=, and a trailing zero terminator
119         #
120         self.l = ldb.Ldb(self.url(),
121                          options=[
122                              "modules:rdn_name",
123                              "max_key_len_for_self_test:54"])
124         self.l.add({"dn": "@ATTRIBUTES",
125                     "uniqueThing": "UNIQUE_INDEX"})
126         self.l.add({"dn": "@INDEXLIST",
127                     "@IDXATTR": [
128                         b"uniqueThing",
129                         b"notUnique",
130                         b"base64____lt",
131                         b"base64_____eq",
132                         b"base64______gt"],
133                     "@IDXONE": [b"1"],
134                     "@IDXGUID": [b"objectUUID"],
135                     "@IDX_DN_GUID": [b"GUID"]})
136
137     # Add a value to a unique index that exceeds the maximum key length
138     # This should be rejected.
139     def test_add_long_unique_add(self):
140         try:
141             self.l.add({"dn": "OU=UNIQUE_MAX_LEN,DC=SAMBA,DC=ORG",
142                         "objectUUID": b"0123456789abcdef",
143                         "uniqueThing": "01234567890123456789012345678901"})
144             # index key will be
145             # "@INDEX:UNIQUETHING:01234567890123456789012345678901"
146             self.fail("Should have failed on long index key")
147
148         except ldb.LdbError as err:
149             enum = err.args[0]
150             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
151
152     # Test that DN's longer the maximum key length can be added
153     # and that duplicate DN's are rejected correctly
154     def test_add_long_dn_add(self):
155         #
156         # For all entries the DN index key gets truncated to
157         # @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
158         #
159         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
160                     "objectUUID": b"0123456789abcdef"})
161         self.checkGuids(
162             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
163             b"0123456789abcdef")
164
165         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
166                     "objectUUID": b"0123456789abcde0"})
167         self.checkGuids(
168             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
169             b"0123456789abcde0" + b"0123456789abcdef")
170
171         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
172                     "objectUUID": b"0123456789abcde1"})
173         self.checkGuids(
174             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
175             b"0123456789abcde0" + b"0123456789abcde1" + b"0123456789abcdef")
176
177         # Key is equal to max length does not get inserted into the truncated
178         # key namespace
179         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
180                     "objectUUID": b"0123456789abcde5"})
181         self.checkGuids(
182             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
183             b"0123456789abcde5")
184
185         # This key should not get truncated, as it's one character less than
186         # max, and will not be in the truncate name space
187         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
188                     "objectUUID": b"0123456789abcde7"})
189         self.checkGuids(
190             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
191             b"0123456789abcde7")
192
193         try:
194             self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
195                         "objectUUID": b"0123456789abcde2"})
196         except ldb.LdbError as err:
197             enum = err.args[0]
198             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
199
200         try:
201             self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
202                         "objectUUID": b"0123456789abcde3"})
203         except ldb.LdbError as err:
204             enum = err.args[0]
205             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
206
207         try:
208             self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
209                         "objectUUID": b"0123456789abcde4"})
210         except ldb.LdbError as err:
211             enum = err.args[0]
212             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
213
214         try:
215             self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
216                         "objectUUID": b"0123456789abcde6"})
217         except ldb.LdbError as err:
218             enum = err.args[0]
219             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
220
221         try:
222             self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXX,DC=SAMBA",
223                         "objectUUID": b"0123456789abcde8"})
224         except ldb.LdbError as err:
225             enum = err.args[0]
226             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
227
228     def test_rename_truncated_dn_keys(self):
229         # For all entries the DN index key gets truncated to
230         #    0        1         2         3         4         5
231         #    12345678901234567890123456789012345678901234567890
232         #    @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
233         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
234                     "objectUUID": b"0123456789abcdef"})
235         self.checkGuids(
236             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
237             b"0123456789abcdef")
238
239         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
240                     "objectUUID": b"0123456789abcde0"})
241         self.checkGuids(
242             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
243             b"0123456789abcde0" + b"0123456789abcdef")
244
245         # Non conflicting rename, should succeed
246         self.l.rename("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
247                       "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
248         # Index should be unchanged.
249         self.checkGuids(
250             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
251             b"0123456789abcde0" + b"0123456789abcdef")
252
253         # Conflicting rename should fail
254         try:
255             self.l.rename("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
256                           "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
257         except ldb.LdbError as err:
258             enum = err.args[0]
259             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
260
261     def test_delete_truncated_dn_keys(self):
262         #
263         # For all entries the DN index key gets truncated to
264         #    0        1         2         3         4         5
265         #    12345678901234567890123456789012345678901234567890
266         #    @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
267         #
268         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
269                     "objectUUID": b"0123456789abcdef"})
270         self.checkGuids(
271             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
272             b"0123456789abcdef")
273
274         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
275                     "objectUUID": b"0123456789abcde1"})
276         self.checkGuids(
277             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
278             b"0123456789abcde1" + b"0123456789abcdef")
279
280         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
281                     "objectUUID": b"0123456789abcde5"})
282         self.checkGuids(
283             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
284             b"0123456789abcde5")
285
286         # Try to delete a non existent DN with a truncated key
287         try:
288             self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM")
289         except ldb.LdbError as err:
290             enum = err.args[0]
291             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
292             # Ensure that non of the other truncated DN's got deleted
293             res = self.l.search(
294                 base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
295             self.assertEqual(len(res), 1)
296
297             res = self.l.search(
298                 base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
299             self.assertEqual(len(res), 1)
300
301             # Ensure that the non truncated DN did not get deleted
302             res = self.l.search(
303                 base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
304             self.assertEqual(len(res), 1)
305
306             # Check the indexes are correct
307             self.checkGuids(
308                 "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
309                 b"0123456789abcde1" + b"0123456789abcdef")
310             self.checkGuids(
311                 "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
312                 b"0123456789abcde5")
313
314         # delete an existing entry
315         self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
316
317         # Ensure it got deleted
318         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
319         self.assertEqual(len(res), 0)
320
321         # Ensure that non of the other truncated DN's got deleted
322         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
323         self.assertEqual(len(res), 1)
324
325         # Ensure the non truncated entry did not get deleted.
326         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
327         self.assertEqual(len(res), 1)
328
329         # Check the indexes are correct
330         self.checkGuids(
331             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
332             b"0123456789abcde1")
333         self.checkGuids(
334             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
335             b"0123456789abcde5")
336
337         # delete an existing entry
338         self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
339
340         # Ensure it got deleted
341         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXX,DC=SAMBA,DC=GOV")
342         self.assertEqual(len(res), 0)
343
344         # Ensure that non of the non truncated DN's got deleted
345         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
346         self.assertEqual(len(res), 1)
347         # Check the indexes are correct
348         self.checkGuids(
349             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
350             None)
351         self.checkGuids(
352             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
353             b"0123456789abcde5")
354
355         # delete an existing entry
356         self.l.delete("OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
357
358         # Ensure it got deleted
359         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBAxxx")
360         self.assertEqual(len(res), 0)
361         self.checkGuids(
362             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
363             None)
364
365     def test_search_truncated_dn_keys(self):
366         #
367         # For all entries the DN index key gets truncated to
368         #    0        1         2         3         4         5
369         #    12345678901234567890123456789012345678901234567890
370         #    @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
371         #
372         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
373                     "objectUUID": b"0123456789abcdef"})
374         self.checkGuids(
375             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
376             b"0123456789abcdef")
377
378         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
379                     "objectUUID": b"0123456789abcde1"})
380         self.checkGuids(
381             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
382             b"0123456789abcde1" + b"0123456789abcdef")
383
384         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
385                     "objectUUID": b"0123456789abcde5"})
386         self.checkGuids(
387             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
388             b"0123456789abcde5")
389
390         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
391         self.assertEqual(len(res), 1)
392
393         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
394         self.assertEqual(len(res), 1)
395
396         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
397         self.assertEqual(len(res), 1)
398
399         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM")
400         self.assertEqual(len(res), 0)
401
402         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
403         self.assertEqual(len(res), 0)
404
405         # Non existent, key one less than truncation limit
406         res = self.l.search(base="OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA")
407         self.assertEqual(len(res), 0)
408
409     def test_search_dn_filter_truncated_dn_keys(self):
410         #
411         # For all entries the DN index key gets truncated to
412         #    0        1         2         3         4         5
413         #    12345678901234567890123456789012345678901234567890
414         #    @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
415         #
416         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
417                     "objectUUID": b"0123456789abcdef"})
418         self.checkGuids(
419             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
420             b"0123456789abcdef")
421
422         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
423                     "objectUUID": b"0123456789abcde1"})
424         self.checkGuids(
425             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
426             b"0123456789abcde1" + b"0123456789abcdef")
427
428         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
429                     "objectUUID": b"0123456789abcde5"})
430         self.checkGuids(
431             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
432             b"0123456789abcde5")
433
434         res = self.l.search(
435             expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG")
436         self.assertEqual(len(res), 1)
437
438         res = self.l.search(
439             expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
440         self.assertEqual(len(res), 1)
441
442         res = self.l.search(
443             expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA")
444         self.assertEqual(len(res), 1)
445
446         res = self.l.search(
447             expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM")
448         self.assertEqual(len(res), 0)
449
450         res = self.l.search(
451             expression="dn=OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV")
452         self.assertEqual(len(res), 0)
453
454         # Non existent, key one less than truncation limit
455         res = self.l.search(
456             expression="dn=OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA")
457         self.assertEqual(len(res), 0)
458
459     def test_search_one_level_truncated_dn_keys(self):
460         #
461         # Except for the base DN's
462         # all entries the DN index key gets truncated to
463         #    0        1         2         3         4         5
464         #    12345678901234567890123456789012345678901234567890
465         #    @INDEX:@IDXDN:OU=??,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA
466         # The base DN-s truncate to
467         #    @INDEX:@IDXDN:OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR
468         #
469         self.l.add({"dn": "OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
470                     "objectUUID": b"0123456789abcdef"})
471         self.l.add({"dn": "OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
472                     "objectUUID": b"0123456789abcd1f"})
473         self.checkGuids(
474             "@INDEX#@IDXDN#OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR",
475             b"0123456789abcd1f" + b"0123456789abcdef")
476
477         self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
478                     "objectUUID": b"0123456789abcde1"})
479         self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
480                     "objectUUID": b"0123456789abcd11"})
481         self.checkGuids(
482             "@INDEX#@IDXDN#OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
483             b"0123456789abcd11" + b"0123456789abcde1")
484
485         self.l.add({"dn": "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
486                     "objectUUID": b"0123456789abcde2"})
487         self.l.add({"dn": "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
488                     "objectUUID": b"0123456789abcdf2"})
489         self.checkGuids(
490             "@INDEX#@IDXDN#OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
491             b"0123456789abcde2" + b"0123456789abcdf2")
492
493         self.l.add({"dn": "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
494                     "objectUUID": b"0123456789abcde3"})
495         self.l.add({"dn": "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
496                     "objectUUID": b"0123456789abcd13"})
497         self.checkGuids(
498             "@INDEX#@IDXDN#OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
499             b"0123456789abcd13" + b"0123456789abcde3")
500
501         # This key is not truncated as it's the max_key_len
502         self.l.add({"dn": "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
503                     "objectUUID": b"0123456789abcde7"})
504         self.checkGuids(
505             "@INDEX:@IDXDN:OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
506             b"0123456789abcde7")
507
508         res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1",
509                             scope=ldb.SCOPE_ONELEVEL)
510         self.assertEqual(len(res), 3)
511         self.assertTrue(
512             contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1"))
513         self.assertTrue(
514             contains(res, "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1"))
515         self.assertTrue(
516             contains(res, "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR1"))
517
518         res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2",
519                             scope=ldb.SCOPE_ONELEVEL)
520         self.assertEqual(len(res), 3)
521         self.assertTrue(
522             contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2"))
523         self.assertTrue(
524             contains(res, "OU=02,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2"))
525         self.assertTrue(
526             contains(res, "OU=03,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA,DC=OR2"))
527
528         res = self.l.search(base="OU=A_LONG_DN_ONE_LVLX,DC=SAMBA",
529                             scope=ldb.SCOPE_ONELEVEL)
530         self.assertEqual(len(res), 1)
531         self.assertTrue(
532             contains(res, "OU=01,OU=A_LONG_DN_ONE_LVLX,DC=SAMBA"))
533
534     def test_search_sub_tree_truncated_dn_keys(self):
535         #
536         # Except for the base DN's
537         # all entries the DN index key gets truncated to
538         #    0        1         2         3         4         5
539         #    12345678901234567890123456789012345678901234567890
540         #    @INDEX:@IDXDN:OU=??,OU=A_LONG_DN_SUB_TREE,DC=SAMBA
541         # The base DN-s truncate to
542         #    @INDEX:@IDXDN:OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR
543         #
544         self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
545                     "objectUUID": b"0123456789abcdef"})
546         self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
547                     "objectUUID": b"0123456789abcde4"})
548         self.l.add({"dn": "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3",
549                     "objectUUID": b"0123456789abcde8"})
550         self.checkGuids(
551             "@INDEX#@IDXDN#OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR",
552             b"0123456789abcde4" + b"0123456789abcde8" + b"0123456789abcdef")
553
554         self.l.add({"dn": "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
555                     "objectUUID": b"0123456789abcde1"})
556         self.l.add({"dn": "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
557                     "objectUUID": b"0123456789abcde5"})
558         self.checkGuids(
559             "@INDEX#@IDXDN#OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
560             b"0123456789abcde1" + b"0123456789abcde5")
561
562         self.l.add({"dn": "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
563                     "objectUUID": b"0123456789abcde2"})
564         self.l.add({"dn": "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
565                     "objectUUID": b"0123456789abcde6"})
566         self.checkGuids(
567             "@INDEX#@IDXDN#OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
568             b"0123456789abcde2" + b"0123456789abcde6")
569
570         self.l.add({"dn": "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
571                     "objectUUID": b"0123456789abcde3"})
572
573         self.l.add({"dn": "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
574                     "objectUUID": b"0123456789abcde7"})
575         self.checkGuids(
576             "@INDEX#@IDXDN#OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
577             b"0123456789abcde3" + b"0123456789abcde7")
578
579         self.l.add({"dn": "OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4",
580                     "objectUUID": b"0123456789abcde9"})
581         self.checkGuids(
582             "@INDEX#@IDXDN#OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA",
583             b"0123456789abcde9")
584
585         res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1",
586                             scope=ldb.SCOPE_SUBTREE)
587         self.assertEqual(len(res), 4)
588         self.assertTrue(
589             contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
590         self.assertTrue(
591             contains(res, "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
592         self.assertTrue(
593             contains(res, "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
594         self.assertTrue(
595             contains(res, "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR1"))
596
597         res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2",
598                             scope=ldb.SCOPE_SUBTREE)
599         self.assertEqual(len(res), 4)
600         self.assertTrue(
601             contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
602         self.assertTrue(
603             contains(res, "OU=01,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
604         self.assertTrue(
605             contains(res, "OU=02,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
606         self.assertTrue(
607             contains(res, "OU=03,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR2"))
608
609         res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3",
610                             scope=ldb.SCOPE_SUBTREE)
611         self.assertEqual(len(res), 1)
612         self.assertTrue(
613             contains(res, "OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR3"))
614
615         res = self.l.search(base="OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4",
616                             scope=ldb.SCOPE_SUBTREE)
617         self.assertEqual(len(res), 1)
618         self.assertTrue(
619             contains(res, "OU=04,OU=A_LONG_DN_SUB_TREE,DC=SAMBA,DC=OR4"))
620
621     def test_search_base_truncated_dn_keys(self):
622         #
623         # For all entries the DN index key gets truncated to
624         #    0        1         2         3         4         5
625         #    12345678901234567890123456789012345678901234567890
626         #    @INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA
627         #
628         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
629                     "objectUUID": b"0123456789abcdef"})
630         self.checkGuids(
631             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
632             b"0123456789abcdef")
633
634         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
635                     "objectUUID": b"0123456789abcde1"})
636         self.checkGuids(
637             "@INDEX#@IDXDN#OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
638             b"0123456789abcde1" + b"0123456789abcdef")
639
640         self.l.add({"dn": "OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
641                     "objectUUID": b"0123456789abcde5"})
642         self.checkGuids(
643             "@INDEX:@IDXDN:OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
644             b"0123456789abcde5")
645
646         res = self.l.search(
647             base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=ORG",
648             scope=ldb.SCOPE_BASE)
649         self.assertEqual(len(res), 1)
650
651         res = self.l.search(
652             base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
653             scope=ldb.SCOPE_BASE)
654         self.assertEqual(len(res), 1)
655
656         res = self.l.search(
657             base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA",
658             scope=ldb.SCOPE_BASE)
659         self.assertEqual(len(res), 1)
660
661         res = self.l.search(
662             base="OU=A_LONG_DNXXXXXXXXXXXXXXX,DC=SAMBA,DC=COM",
663             scope=ldb.SCOPE_BASE)
664         self.assertEqual(len(res), 0)
665
666         res = self.l.search(
667             base="OU=A_LONG_DNXXXXXXXXXXXX,DC=SAMBA,DC=GOV",
668             scope=ldb.SCOPE_BASE)
669         self.assertEqual(len(res), 0)
670
671         # Non existent, key one less than truncation limit
672         res = self.l.search(
673             base="OU=A_LONG_DNXXXXXXXXXXXXXX,DC=SAMBA",
674             scope=ldb.SCOPE_BASE)
675         self.assertEqual(len(res), 0)
676
677     #
678     # Test non unique index searched with truncated keys
679     #
680     def test_index_truncated_keys(self):
681         # 0        1         2         3         4         5
682         # 12345678901234567890123456789012345678901234567890
683         # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
684
685         eq_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
686         gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
687         lt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
688         # > than max length and differs in values that will be truncated
689         gt_max_b = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
690
691         # Add two entries with the same value, key length = max so no
692         # truncation.
693         self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
694                     "notUnique": eq_max,
695                     "objectUUID": b"0123456789abcde0"})
696         self.checkGuids(
697             "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
698             b"0123456789abcde0")
699
700         self.l.add({"dn": "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
701                     "notUnique": eq_max,
702                     "objectUUID": b"0123456789abcde1"})
703         self.checkGuids(
704             "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
705             b"0123456789abcde0" + b"0123456789abcde1")
706
707         #
708         # An entry outside the tree
709         #
710         self.l.add({"dn": "OU=10,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG",
711                     "notUnique": eq_max,
712                     "objectUUID": b"0123456789abcd11"})
713         self.checkGuids(
714             "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
715             b"0123456789abcd11" + b"0123456789abcde0" + b"0123456789abcde1")
716
717         # Key longer than max so should get truncated to same key as
718         # the previous two entries
719         self.l.add({"dn": "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
720                     "notUnique": gt_max,
721                     "objectUUID": b"0123456789abcde2"})
722         # But in the truncated key space
723         self.checkGuids(
724             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
725             b"0123456789abcde2")
726
727         # Key longer than max so should get truncated to same key as
728         # the previous entries but differs in the chars after max length
729         self.l.add({"dn": "OU=23,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
730                     "notUnique": gt_max_b,
731                     "objectUUID": b"0123456789abcd22"})
732         # But in the truncated key space
733         self.checkGuids(
734             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
735             b"0123456789abcd22" + b"0123456789abcde2")
736         #
737         # An entry outside the tree
738         #
739         self.l.add({"dn": "OU=11,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG",
740                     "notUnique": gt_max,
741                     "objectUUID": b"0123456789abcd12"})
742         self.checkGuids(
743             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
744             b"0123456789abcd12" + b"0123456789abcd22" + b"0123456789abcde2")
745
746         # Key shorter than max
747         #
748         self.l.add({"dn": "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
749                     "notUnique": lt_max,
750                     "objectUUID": b"0123456789abcde3"})
751         self.checkGuids(
752             "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
753             b"0123456789abcde3")
754         #
755         # An entry outside the tree
756         #
757         self.l.add({"dn": "OU=12,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG",
758                     "notUnique": lt_max,
759                     "objectUUID": b"0123456789abcd13"})
760         self.checkGuids(
761             "@INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
762             b"0123456789abcd13" + b"0123456789abcde3")
763
764         #
765         # search for target is max value not truncated
766         # should return ou's 01, 02
767         #
768         expression = "(notUnique=" + eq_max.decode('ascii') + ")"
769         res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
770                             scope=ldb.SCOPE_ONELEVEL,
771                             expression=expression)
772         self.assertEqual(len(res), 2)
773         self.assertTrue(
774             contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
775         self.assertTrue(
776             contains(res, "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
777         #
778         # search for target is max value not truncated
779         # search one level up the tree, scope is ONE_LEVEL
780         # So should get no matches
781         #
782         expression = "(notUnique=" + eq_max.decode('ascii') + ")"
783         res = self.l.search(base="DC=SAMBA,DC=ORG",
784                             scope=ldb.SCOPE_ONELEVEL,
785                             expression=expression)
786         self.assertEqual(len(res), 0)
787         #
788         # search for target is max value not truncated
789         # search one level up the tree, scope is SUBTREE
790         # So should get 3 matches
791         #
792         res = self.l.search(base="DC=SAMBA,DC=ORG",
793                             scope=ldb.SCOPE_SUBTREE,
794                             expression=expression)
795         self.assertEqual(len(res), 3)
796         self.assertTrue(
797             contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
798         self.assertTrue(
799             contains(res, "OU=02,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
800         self.assertTrue(
801             contains(res, "OU=10,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG"))
802         #
803         # search for target is max value + 1 so truncated
804         # should return ou 23 as it's gt_max_b being searched for
805         #
806         expression = "(notUnique=" + gt_max_b.decode('ascii') + ")"
807         res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
808                             scope=ldb.SCOPE_ONELEVEL,
809                             expression=expression)
810         self.assertEqual(len(res), 1)
811         self.assertTrue(
812             contains(res, "OU=23,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
813
814         #
815         # search for target is max value + 1 so truncated
816         # should return ou 03 as it's gt_max being searched for
817         #
818         expression = "(notUnique=" + gt_max.decode('ascii') + ")"
819         res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
820                             scope=ldb.SCOPE_ONELEVEL,
821                             expression=expression)
822         self.assertEqual(len(res), 1)
823         self.assertTrue(
824             contains(res, "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
825
826         #
827         # scope one level and one level up one level up should get no matches
828         #
829         res = self.l.search(base="DC=SAMBA,DC=ORG",
830                             scope=ldb.SCOPE_ONELEVEL,
831                             expression=expression)
832         self.assertEqual(len(res), 0)
833         #
834         # scope sub tree and one level up one level up should get 2 matches
835         #
836         res = self.l.search(base="DC=SAMBA,DC=ORG",
837                             scope=ldb.SCOPE_SUBTREE,
838                             expression=expression)
839         self.assertEqual(len(res), 2)
840         self.assertTrue(
841             contains(res, "OU=03,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
842         self.assertTrue(
843             contains(res, "OU=11,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG"))
844
845         #
846         # search for target is max value - 1 so not truncated
847         # should return ou 04
848         #
849         expression = "(notUnique=" + lt_max.decode('ascii') + ")"
850         res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
851                             scope=ldb.SCOPE_ONELEVEL,
852                             expression=expression)
853         self.assertEqual(len(res), 1)
854         self.assertTrue(
855             contains(res, "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
856
857         #
858         # scope one level and one level up one level up should get no matches
859         #
860         res = self.l.search(base="DC=SAMBA,DC=ORG",
861                             scope=ldb.SCOPE_ONELEVEL,
862                             expression=expression)
863         self.assertEqual(len(res), 0)
864
865         #
866         # scope sub tree and one level up one level up should get 2 matches
867         #
868         res = self.l.search(base="DC=SAMBA,DC=ORG",
869                             scope=ldb.SCOPE_SUBTREE,
870                             expression=expression)
871         self.assertEqual(len(res), 2)
872         self.assertTrue(
873             contains(res, "OU=04,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
874         self.assertTrue(
875             contains(res, "OU=12,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG"))
876
877     #
878     # Test index key truncation for base64 encoded values
879     #
880     def test_index_truncated_base64_encoded_keys(self):
881         value = b"aaaaaaaaaaaaaaaaaaaa\x02"
882         # base64 encodes to "YWFhYWFhYWFhYWFhYWFhYWFhYWEC"
883
884         # One less than max key length
885         self.l.add({"dn": "OU=01,OU=BASE64,DC=SAMBA,DC=ORG",
886                     "base64____lt": value,
887                     "objectUUID": b"0123456789abcde0"})
888         self.checkGuids(
889             "@INDEX:BASE64____LT::YWFhYWFhYWFhYWFhYWFhYWFhYWEC",
890             b"0123456789abcde0")
891
892         # Equal max key length
893         self.l.add({"dn": "OU=02,OU=BASE64,DC=SAMBA,DC=ORG",
894                     "base64_____eq": value,
895                     "objectUUID": b"0123456789abcde1"})
896         self.checkGuids(
897             "@INDEX:BASE64_____EQ::YWFhYWFhYWFhYWFhYWFhYWFhYWEC",
898             b"0123456789abcde1")
899
900         # One greater than max key length
901         self.l.add({"dn": "OU=03,OU=BASE64,DC=SAMBA,DC=ORG",
902                     "base64______gt": value,
903                     "objectUUID": b"0123456789abcde2"})
904         self.checkGuids(
905             "@INDEX#BASE64______GT##YWFhYWFhYWFhYWFhYWFhYWFhYWE",
906             b"0123456789abcde2")
907     #
908     # Test adding to non unique index with identical multivalued index
909     # attributes
910     #
911
912     def test_index_multi_valued_identical_keys(self):
913         # 0        1         2         3         4         5
914         # 12345678901234567890123456789012345678901234567890
915         # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
916         as_eq_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
917         bs_eq_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
918
919         try:
920             self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
921                         "notUnique": [bs_eq_max, as_eq_max, as_eq_max],
922                         "objectUUID": b"0123456789abcde0"})
923             self.fail("Exception not thrown")
924         except ldb.LdbError as e:
925             code = e.args[0]
926             self.assertEqual(ldb.ERR_ATTRIBUTE_OR_VALUE_EXISTS, code)
927
928     #
929     # Test non unique index with multivalued index attributes
930     #  searched with non truncated keys
931     #
932     def test_search_index_multi_valued_truncated_keys(self):
933         # 0        1         2         3         4         5
934         # 12345678901234567890123456789012345678901234567890
935         # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
936
937         aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
938         ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
939         bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
940
941         self.l.add({"dn": "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
942                     "notUnique": [aa_gt_max, ab_gt_max, bb_gt_max],
943                     "objectUUID": b"0123456789abcde0"})
944         self.checkGuids(
945             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
946             b"0123456789abcde0" + b"0123456789abcde0")
947         self.checkGuids(
948             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
949             b"0123456789abcde0")
950
951         expression = "(notUnique=" + aa_gt_max.decode('ascii') + ")"
952         res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
953                             scope=ldb.SCOPE_ONELEVEL,
954                             expression=expression)
955         self.assertEqual(len(res), 1)
956         self.assertTrue(
957             contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
958
959         expression = "(notUnique=" + ab_gt_max.decode('ascii') + ")"
960         res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
961                             scope=ldb.SCOPE_ONELEVEL,
962                             expression=expression)
963         self.assertEqual(len(res), 1)
964         self.assertTrue(
965             contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
966
967         expression = "(notUnique=" + bb_gt_max.decode('ascii') + ")"
968         res = self.l.search(base="OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG",
969                             scope=ldb.SCOPE_ONELEVEL,
970                             expression=expression)
971         self.assertEqual(len(res), 1)
972         self.assertTrue(
973             contains(res, "OU=01,OU=SEARCH_NON_UNIQUE,DC=SAMBA,DC=ORG"))
974
975     #
976     # Test deletion of records with non unique index with multivalued index
977     # attributes
978     # replicate this to test modify with modify flags i.e. DELETE, REPLACE
979     #
980     def test_delete_index_multi_valued_truncated_keys(self):
981         # 0        1         2         3         4         5
982         # 12345678901234567890123456789012345678901234567890
983         # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
984
985         aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
986         ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
987         bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
988         cc_gt_max = b"cccccccccccccccccccccccccccccccccc"
989
990         self.l.add({"dn": "OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG",
991                     "notUnique": [aa_gt_max, ab_gt_max, bb_gt_max],
992                     "objectUUID": b"0123456789abcde0"})
993         self.l.add({"dn": "OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG",
994                     "notUnique": [aa_gt_max, ab_gt_max, cc_gt_max],
995                     "objectUUID": b"0123456789abcde1"})
996         self.checkGuids(
997             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
998             b"0123456789abcde0" + b"0123456789abcde0" +
999             b"0123456789abcde1" + b"0123456789abcde1")
1000         self.checkGuids(
1001             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1002             b"0123456789abcde0")
1003         self.checkGuids(
1004             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1005             b"0123456789abcde1")
1006
1007         res = self.l.search(
1008             base="DC=SAMBA,DC=ORG",
1009             expression="(notUnique=" + aa_gt_max.decode("ascii") + ")")
1010         self.assertEqual(2, len(res))
1011         self.assertTrue(
1012             contains(res, "OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG"))
1013         self.assertTrue(
1014             contains(res, "OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG"))
1015
1016         self.l.delete("OU=02,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG")
1017         self.checkGuids(
1018             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1019             b"0123456789abcde0" + b"0123456789abcde0")
1020         self.checkGuids(
1021             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1022             b"0123456789abcde0")
1023         self.checkGuids(
1024             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1025             None)
1026
1027         self.l.delete("OU=01,OU=DELETE_NON_UNIQUE,DC=SAMBA,DC=ORG")
1028         self.checkGuids(
1029             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1030             None)
1031         self.checkGuids(
1032             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1033             None)
1034         self.checkGuids(
1035             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1036             None)
1037
1038     #
1039     # Test modification of records with non unique index with multivalued index
1040     # attributes
1041     #
1042     def test_modify_index_multi_valued_truncated_keys(self):
1043         # 0        1         2         3         4         5
1044         # 12345678901234567890123456789012345678901234567890
1045         # @INDEX:NOTUNIQUE:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
1046
1047         aa_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
1048         ab_gt_max = b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab"
1049         bb_gt_max = b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
1050         cc_gt_max = b"cccccccccccccccccccccccccccccccccc"
1051
1052         self.l.add({"dn": "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG",
1053                     "notUnique": [aa_gt_max, ab_gt_max, bb_gt_max],
1054                     "objectUUID": b"0123456789abcde0"})
1055         self.l.add({"dn": "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG",
1056                     "notUnique": [aa_gt_max, ab_gt_max, cc_gt_max],
1057                     "objectUUID": b"0123456789abcde1"})
1058         self.checkGuids(
1059             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1060             b"0123456789abcde0" + b"0123456789abcde0" +
1061             b"0123456789abcde1" + b"0123456789abcde1")
1062         self.checkGuids(
1063             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1064             b"0123456789abcde0")
1065         self.checkGuids(
1066             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1067             b"0123456789abcde1")
1068
1069         res = self.l.search(
1070             base="DC=SAMBA,DC=ORG",
1071             expression="(notUnique=" + aa_gt_max.decode("ascii") + ")")
1072         self.assertEqual(2, len(res))
1073         self.assertTrue(
1074             contains(res, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG"))
1075         self.assertTrue(
1076             contains(res, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG"))
1077
1078         #
1079         # Modify that does not change the indexed attribute
1080         #
1081         msg = ldb.Message()
1082         msg.dn = ldb.Dn(self.l, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
1083         msg["notUnique"] = ldb.MessageElement(
1084             [aa_gt_max, ab_gt_max, bb_gt_max],
1085             ldb.FLAG_MOD_REPLACE,
1086             "notUnique")
1087         self.l.modify(msg)
1088         #
1089         # As the modify is replacing the attribute with the same contents
1090         # there should be no changes to the indexes.
1091         #
1092         self.checkGuids(
1093             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1094             b"0123456789abcde0" + b"0123456789abcde0" +
1095             b"0123456789abcde1" + b"0123456789abcde1")
1096         self.checkGuids(
1097             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1098             b"0123456789abcde0")
1099         self.checkGuids(
1100             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1101             b"0123456789abcde1")
1102
1103         #
1104         # Modify that removes a value from the indexed attribute
1105         #
1106         msg = ldb.Message()
1107         msg.dn = ldb.Dn(self.l, "OU=01,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
1108         msg["notUnique"] = ldb.MessageElement(
1109             [aa_gt_max, bb_gt_max],
1110             ldb.FLAG_MOD_REPLACE,
1111             "notUnique")
1112         self.l.modify(msg)
1113
1114         self.checkGuids(
1115             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1116             b"0123456789abcde0" +
1117             b"0123456789abcde1" + b"0123456789abcde1")
1118         self.checkGuids(
1119             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1120             b"0123456789abcde0")
1121         self.checkGuids(
1122             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1123             b"0123456789abcde1")
1124
1125         #
1126         # Modify that does a constrained delete the indexed attribute
1127         #
1128         msg = ldb.Message()
1129         msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
1130         msg["notUnique"] = ldb.MessageElement(
1131             [ab_gt_max],
1132             ldb.FLAG_MOD_DELETE,
1133             "notUnique")
1134         self.l.modify(msg)
1135
1136         self.checkGuids(
1137             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1138             b"0123456789abcde0" + b"0123456789abcde1")
1139         self.checkGuids(
1140             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1141             b"0123456789abcde0")
1142         self.checkGuids(
1143             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1144             b"0123456789abcde1")
1145
1146         #
1147         # Modify that does an unconstrained delete the indexed attribute
1148         #
1149         msg = ldb.Message()
1150         msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
1151         msg["notUnique"] = ldb.MessageElement(
1152             [],
1153             ldb.FLAG_MOD_DELETE,
1154             "notUnique")
1155         self.l.modify(msg)
1156
1157         self.checkGuids(
1158             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1159             b"0123456789abcde0")
1160         self.checkGuids(
1161             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1162             b"0123456789abcde0")
1163         self.checkGuids(
1164             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1165             None)
1166
1167         #
1168         # Modify that adds a value to the indexed attribute
1169         #
1170         msg = ldb.Message()
1171         msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
1172         msg["notUnique"] = ldb.MessageElement(
1173             [cc_gt_max],
1174             ldb.FLAG_MOD_ADD,
1175             "notUnique")
1176         self.l.modify(msg)
1177
1178         self.checkGuids(
1179             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1180             b"0123456789abcde0")
1181         self.checkGuids(
1182             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1183             b"0123456789abcde0")
1184         self.checkGuids(
1185             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1186             b"0123456789abcde1")
1187
1188         #
1189         # Modify that adds a values to the indexed attribute
1190         #
1191         msg = ldb.Message()
1192         msg.dn = ldb.Dn(self.l, "OU=02,OU=MODIFY_NON_UNIQUE,DC=SAMBA,DC=ORG")
1193         msg["notUnique"] = ldb.MessageElement(
1194             [aa_gt_max, ab_gt_max],
1195             ldb.FLAG_MOD_ADD,
1196             "notUnique")
1197         self.l.modify(msg)
1198
1199         self.checkGuids(
1200             "@INDEX#NOTUNIQUE#aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
1201             b"0123456789abcde0" +
1202             b"0123456789abcde1" + b"0123456789abcde1")
1203         self.checkGuids(
1204             "@INDEX#NOTUNIQUE#bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
1205             b"0123456789abcde0")
1206         self.checkGuids(
1207             "@INDEX#NOTUNIQUE#ccccccccccccccccccccccccccccccccc",
1208             b"0123456789abcde1")
1209
1210     #
1211     # Test Sub tree searches when checkBaseOnSearch is enabled and the
1212     # DN indexes are truncated and collide.
1213     #
1214     def test_check_base_on_search_truncated_dn_keys(self):
1215         #
1216         # Except for the base DN's
1217         # all entries the DN index key gets truncated to
1218         #    0        1         2         3         4         5
1219         #    12345678901234567890123456789012345678901234567890
1220         #    @INDEX:@IDXDN:OU=??,OU=CHECK_BASE_DN_XXXX,DC=SAMBA
1221         # The base DN-s truncate to
1222         #    @INDEX:@IDXDN:OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR
1223         #
1224         checkbaseonsearch = {"dn": "@OPTIONS",
1225                              "checkBaseOnSearch": b"TRUE"}
1226         self.l.add(checkbaseonsearch)
1227
1228         self.l.add({"dn": "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
1229                     "objectUUID": b"0123456789abcdef"})
1230         self.l.add({"dn": "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
1231                     "objectUUID": b"0123456789abcdee"})
1232         self.checkGuids(
1233             "@INDEX#@IDXDN#OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR",
1234             b"0123456789abcdee" + b"0123456789abcdef")
1235
1236         self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
1237                     "objectUUID": b"0123456789abcdec"})
1238         self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
1239                     "objectUUID": b"0123456789abcdeb"})
1240         self.l.add({"dn": "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3",
1241                     "objectUUID": b"0123456789abcded"})
1242         self.checkGuids(
1243             "@INDEX#@IDXDN#OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA",
1244             b"0123456789abcdeb" + b"0123456789abcdec" + b"0123456789abcded")
1245
1246         self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
1247                     "objectUUID": b"0123456789abcde0"})
1248         self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
1249                     "objectUUID": b"0123456789abcde1"})
1250         self.l.add({"dn": "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3",
1251                     "objectUUID": b"0123456789abcde2"})
1252         self.checkGuids(
1253             "@INDEX#@IDXDN#OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA",
1254             b"0123456789abcde0" + b"0123456789abcde1" + b"0123456789abcde2")
1255
1256         res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1",
1257                             scope=ldb.SCOPE_SUBTREE)
1258         self.assertEqual(len(res), 3)
1259         self.assertTrue(
1260             contains(res, "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1"))
1261         self.assertTrue(
1262             contains(res, "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1"))
1263         self.assertTrue(
1264             contains(res, "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR1"))
1265
1266         res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2",
1267                             scope=ldb.SCOPE_SUBTREE)
1268         self.assertEqual(len(res), 3)
1269         self.assertTrue(
1270             contains(res, "OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2"))
1271         self.assertTrue(
1272             contains(res, "OU=01,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2"))
1273         self.assertTrue(
1274             contains(res, "OU=02,OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR2"))
1275
1276         try:
1277             res = self.l.search(base="OU=CHECK_BASE_DN_XXXX,DC=SAMBA,DC=OR3",
1278                                 scope=ldb.SCOPE_SUBTREE)
1279             self.fail("Expected exception no thrown")
1280         except ldb.LdbError as e:
1281             code = e.args[0]
1282             self.assertEqual(ldb.ERR_NO_SUCH_OBJECT, code)
1283
1284
1285 # Run the index truncation tests against an lmdb backend
1286 class MaxIndexKeyLengthTestsLmdb(MaxIndexKeyLengthTests):
1287
1288     def setUp(self):
1289         self.prefix = MDB_PREFIX
1290         super(MaxIndexKeyLengthTestsLmdb, self).setUp()
1291
1292     def tearDown(self):
1293         super(MaxIndexKeyLengthTestsLmdb, self).tearDown()
1294
1295
1296 # Run the index truncation tests against an lmdb backend
1297 class RejectSubDBIndex(LdbBaseTest):
1298
1299     def setUp(self):
1300         self.prefix = MDB_PREFIX
1301         super(RejectSubDBIndex, self).setUp()
1302         self.testdir = tempdir()
1303         self.filename = os.path.join(self.testdir,
1304                                      "reject_subidx_test.ldb")
1305         self.l = ldb.Ldb(self.url(),
1306                          options=[
1307                              "modules:rdn_name"])
1308
1309     def tearDown(self):
1310         super(RejectSubDBIndex, self).tearDown()
1311
1312     def test_try_subdb_index(self):
1313         try:
1314             self.l.add({"dn": "@INDEXLIST",
1315                         "@IDX_LMDB_SUBDB": [b"1"],
1316                         "@IDXONE": [b"1"],
1317                         "@IDXGUID": [b"objectUUID"],
1318                         "@IDX_DN_GUID": [b"GUID"],
1319                         })
1320         except ldb.LdbError as e:
1321             code = e.args[0]
1322             string = e.args[1]
1323             self.assertEqual(ldb.ERR_OPERATIONS_ERROR, code)
1324             self.assertIn("sub-database index", string)
1325
1326
1327 if __name__ == '__main__':
1328     import unittest
1329     unittest.TestProgram()