7d96103bd9521f1623531fbf45e2eb0ac2f1621b
[samba.git] / source4 / dsdb / tests / python / vlv.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import base64
9 import random
10 import re
11
12 sys.path.insert(0, "bin/python")
13 import samba
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 import ldb
20 from samba.samdb import SamDB
21
22 import time
23
24 parser = optparse.OptionParser("vlv.py [options] <host>")
25 sambaopts = options.SambaOptions(parser)
26 parser.add_option_group(sambaopts)
27 parser.add_option_group(options.VersionOptions(parser))
28 # use command line creds if available
29 credopts = options.CredentialsOptions(parser)
30 parser.add_option_group(credopts)
31 subunitopts = SubunitOptions(parser)
32 parser.add_option_group(subunitopts)
33
34 parser.add_option('--elements', type='int', default=20,
35                   help="use this many elements in the tests")
36
37 parser.add_option('--delete-in-setup', action='store_true',
38                   help="cleanup in next setup rather than teardown")
39
40 parser.add_option('--skip-attr-regex',
41                   help="ignore attributes matching this regex")
42
43 opts, args = parser.parse_args()
44
45 if len(args) < 1:
46     parser.print_usage()
47     sys.exit(1)
48
49 host = args[0]
50
51 lp = sambaopts.get_loadparm()
52 creds = credopts.get_credentials(lp)
53
54 N_ELEMENTS = opts.elements
55
56
57 class VlvTestException(Exception):
58     pass
59
60
61 def encode_vlv_control(critical=1,
62                        before=0, after=0,
63                        offset=None,
64                        gte=None,
65                        n=0, cookie=None):
66
67     s = "vlv:%d:%d:%d:" % (critical, before, after)
68
69     if offset is not None:
70         m = "%d:%d" % (offset, n)
71     elif ':' in gte or '\x00' in gte:
72         gte = base64.b64encode(gte).decode('utf8')
73         m = "base64>=%s" % gte
74     else:
75         m = ">=%s" % gte
76
77     if cookie is None:
78         return s + m
79
80     return s + m + ':' + cookie
81
82
83 def get_cookie(controls, expected_n=None):
84     """Get the cookie, STILL base64 encoded, or raise ValueError."""
85     for c in list(controls):
86         cstr = str(c)
87         if cstr.startswith('vlv_resp'):
88             head, n, _, cookie = cstr.rsplit(':', 3)
89             if expected_n is not None and int(n) != expected_n:
90                 raise ValueError("Expected %s items, server said %s" %
91                                  (expected_n, n))
92             return cookie
93     raise ValueError("there is no VLV response")
94
95
96 class VLVTests(samba.tests.TestCase):
97
98     def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
99         name = "%s%d%s" % (prefix, i, suffix)
100         user = {
101             'cn': name,
102             "objectclass": "user",
103             'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
104             "roomNumber": "%sbc" % (n - i),
105             "carLicense": "后来经",
106             "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
107             "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
108             "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
109             "flags": str(i * (n - i)),
110             "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
111                                             ' 3z}'[i & 3],
112                                             '"@'[i & 1],),
113         }
114
115         # _user_broken_attrs tests are broken due to problems outside
116         # of VLV.
117         _user_broken_attrs = {
118             # Sort doesn't look past a NUL byte.
119             "photo": "\x00%d" % (n - i),
120             "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
121                                                              chr(i & 255), i),
122             "displayNamePrintable": "%d\x00%c" % (i, i & 255),
123             "adminDisplayName": "%d\x00b" % (n - i),
124             "title": "%d%sb" % (n - i, '\x00' * i),
125             "comment": "Favourite colour is %d" % (n % (i + 1)),
126
127             # Names that vary only in case. Windows returns
128             # equivalent addresses in the order they were put
129             # in ('a st', 'A st',...).
130             "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
131         }
132
133         if attrs is not None:
134             user.update(attrs)
135
136         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
137
138         if opts.skip_attr_regex:
139             match = re.compile(opts.skip_attr_regex).search
140             for k in user.keys():
141                 if match(k):
142                     del user[k]
143
144         self.users.append(user)
145         self.ldb.add(user)
146         return user
147
148     def setUp(self):
149         super(VLVTests, self).setUp()
150         self.ldb = SamDB(host, credentials=creds,
151                          session_info=system_session(lp), lp=lp)
152
153         self.base_dn = self.ldb.domain_dn()
154         self.ou = "ou=vlv,%s" % self.base_dn
155         if opts.delete_in_setup:
156             try:
157                 self.ldb.delete(self.ou, ['tree_delete:1'])
158             except ldb.LdbError as e:
159                 print("tried deleting %s, got error %s" % (self.ou, e))
160         self.ldb.add({
161             "dn": self.ou,
162             "objectclass": "organizationalUnit"})
163
164         self.users = []
165         for i in range(N_ELEMENTS):
166             self.create_user(i, N_ELEMENTS)
167
168         attrs = self.users[0].keys()
169         self.binary_sorted_keys = ['audio',
170                                    'photo',
171                                    "msTSExpireDate4",
172                                    'serialNumber',
173                                    "displayNamePrintable"]
174
175         self.numeric_sorted_keys = ['flags',
176                                     'accountExpires']
177
178         self.timestamp_keys = ['msTSExpireDate4']
179
180         self.int64_keys = set(['accountExpires'])
181
182         self.locale_sorted_keys = [x for x in attrs if
183                                    x not in (self.binary_sorted_keys +
184                                              self.numeric_sorted_keys)]
185
186         # don't try spaces, etc in cn
187         self.delicate_keys = ['cn']
188
189     def tearDown(self):
190         super(VLVTests, self).tearDown()
191         if not opts.delete_in_setup:
192             self.ldb.delete(self.ou, ['tree_delete:1'])
193
194     def get_full_list(self, attr, include_cn=False):
195         """Fetch the whole list sorted on the attribute, using the VLV.
196         This way you get a VLV cookie."""
197         n_users = len(self.users)
198         sort_control = "server_sort:1:0:%s" % attr
199         half_n = n_users // 2
200         vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
201         attrs = [attr]
202         if include_cn:
203             attrs.append('cn')
204         res = self.ldb.search(self.ou,
205                               scope=ldb.SCOPE_ONELEVEL,
206                               attrs=attrs,
207                               controls=[sort_control,
208                                         vlv_search])
209         if include_cn:
210             full_results = [(x[attr][0], x['cn'][0]) for x in res]
211         else:
212             full_results = [x[attr][0].lower() for x in res]
213         controls = res.controls
214         return full_results, controls, sort_control
215
216     def get_expected_order(self, attr, expression=None):
217         """Fetch the whole list sorted on the attribute, using sort only."""
218         sort_control = "server_sort:1:0:%s" % attr
219         res = self.ldb.search(self.ou,
220                               scope=ldb.SCOPE_ONELEVEL,
221                               expression=expression,
222                               attrs=[attr],
223                               controls=[sort_control])
224         results = [x[attr][0] for x in res]
225         return results
226
227     def delete_user(self, user):
228         self.ldb.delete(user['dn'])
229         del self.users[self.users.index(user)]
230
231     def get_gte_tests_and_order(self, attr, expression=None):
232         expected_order = self.get_expected_order(attr, expression=expression)
233         gte_users = []
234         if attr in self.delicate_keys:
235             gte_keys = [
236                 '3',
237                 'abc',
238                 '¹',
239                 'ŋđ¼³ŧ“«đð',
240                 '桑巴',
241             ]
242         elif attr in self.timestamp_keys:
243             gte_keys = [
244                 '18560101010000.0Z',
245                 '19140103010000.0Z',
246                 '19560101010010.0Z',
247                 '19700101000000.0Z',
248                 '19991231211234.3Z',
249                 '20061111211234.0Z',
250                 '20390901041234.0Z',
251                 '25560101010000.0Z',
252             ]
253         elif attr not in self.numeric_sorted_keys:
254             gte_keys = [
255                 '3',
256                 'abc',
257                 ' ',
258                 '!@#!@#!',
259                 'kōkako',
260                 '¹',
261                 'ŋđ¼³ŧ“«đð',
262                 '\n\t\t',
263                 '桑巴',
264                 'zzzz',
265             ]
266             if expected_order:
267                 gte_keys.append(expected_order[len(expected_order) // 2] + ' tail')
268
269         else:
270             # "numeric" means positive integers
271             # doesn't work with -1, 3.14, ' 3', '9' * 20
272             gte_keys = ['3',
273                         '1' * 10,
274                         '1',
275                         '9' * 7,
276                         '0']
277
278             if attr in self.int64_keys:
279                 gte_keys += ['3' * 12, '71' * 8]
280
281         for i, x in enumerate(gte_keys):
282             user = self.create_user(i, N_ELEMENTS,
283                                     prefix='gte',
284                                     attrs={attr: x})
285             gte_users.append(user)
286
287         gte_order = self.get_expected_order(attr)
288         for user in gte_users:
289             self.delete_user(user)
290
291         # for sanity's sake
292         expected_order_2 = self.get_expected_order(attr, expression=expression)
293         self.assertEqual(expected_order, expected_order_2)
294
295         # Map gte tests to indexes in expected order. This will break
296         # if gte_order and expected_order are differently ordered (as
297         # it should).
298         gte_map = {}
299
300         # index to the first one with each value
301         index_map = {}
302         for i, k in enumerate(expected_order):
303             if k not in index_map:
304                 index_map[k] = i
305
306         keys = []
307         for k in gte_order:
308             if k in index_map:
309                 i = index_map[k]
310                 gte_map[k] = i
311                 for k in keys:
312                     gte_map[k] = i
313                 keys = []
314             else:
315                 keys.append(k)
316
317         for k in keys:
318             gte_map[k] = len(expected_order)
319
320         if False:
321             print("gte_map:")
322             for k in gte_order:
323                 print("   %10s => %10s" % (k, gte_map[k]))
324
325         return gte_order, expected_order, gte_map
326
327     def assertCorrectResults(self, results, expected_order,
328                              offset, before, after):
329         """A helper to calculate offsets correctly and say as much as possible
330         when something goes wrong."""
331
332         start = max(offset - before - 1, 0)
333         end = offset + after
334         expected_results = expected_order[start: end]
335
336         # if it is a tuple with the cn, drop the cn
337         if expected_results and isinstance(expected_results[0], tuple):
338             expected_results = [x[0] for x in expected_results]
339
340         if expected_results == results:
341             return
342
343         if expected_order is not None:
344             print("expected order: %s" % expected_order[:20])
345             if len(expected_order) > 20:
346                 print("... and %d more not shown" % (len(expected_order) - 20))
347
348         print("offset %d before %d after %d" % (offset, before, after))
349         print("start %d end %d" % (start, end))
350         print("expected: %s" % expected_results)
351         print("got     : %s" % results)
352         self.assertEquals(expected_results, results)
353
354     def test_server_vlv_with_cookie(self):
355         attrs = [x for x in self.users[0].keys() if x not in
356                  ('dn', 'objectclass')]
357         for attr in attrs:
358             expected_order = self.get_expected_order(attr)
359             sort_control = "server_sort:1:0:%s" % attr
360             res = None
361             n = len(self.users)
362             for before in [10, 0, 3, 1, 4, 5, 2]:
363                 for after in [0, 3, 1, 4, 5, 2, 7]:
364                     for offset in range(max(1, before - 2),
365                                         min(n - after + 2, n)):
366                         if res is None:
367                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
368                                                                offset)
369                         else:
370                             cookie = get_cookie(res.controls, n)
371                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
372                                           (before, after, offset, n,
373                                            cookie))
374
375                         res = self.ldb.search(self.ou,
376                                               scope=ldb.SCOPE_ONELEVEL,
377                                               attrs=[attr],
378                                               controls=[sort_control,
379                                                         vlv_search])
380
381                         results = [x[attr][0] for x in res]
382
383                         self.assertCorrectResults(results, expected_order,
384                                                   offset, before, after)
385
386     def run_index_tests_with_expressions(self, expressions):
387         # Here we don't test every before/after combination.
388         attrs = [x for x in self.users[0].keys() if x not in
389                  ('dn', 'objectclass')]
390         for attr in attrs:
391             for expression in expressions:
392                 expected_order = self.get_expected_order(attr, expression)
393                 sort_control = "server_sort:1:0:%s" % attr
394                 res = None
395                 n = len(expected_order)
396                 for before in range(0, 11):
397                     after = before
398                     for offset in range(max(1, before - 2),
399                                         min(n - after + 2, n)):
400                         if res is None:
401                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
402                                                                offset)
403                         else:
404                             cookie = get_cookie(res.controls)
405                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
406                                           (before, after, offset, n,
407                                            cookie))
408
409                         res = self.ldb.search(self.ou,
410                                               expression=expression,
411                                               scope=ldb.SCOPE_ONELEVEL,
412                                               attrs=[attr],
413                                               controls=[sort_control,
414                                                         vlv_search])
415
416                         results = [x[attr][0] for x in res]
417
418                         self.assertCorrectResults(results, expected_order,
419                                                   offset, before, after)
420
421     def test_server_vlv_with_expression(self):
422         """What happens when we run the VLV with an expression?"""
423         expressions = ["(objectClass=*)",
424                        "(cn=%s)" % self.users[-1]['cn'],
425                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
426                        ]
427         self.run_index_tests_with_expressions(expressions)
428
429     def test_server_vlv_with_failing_expression(self):
430         """What happens when we run the VLV on an expression that matches
431         nothing?"""
432         expressions = ["(samaccountname=testferf)",
433                        "(cn=hefalump)",
434                        ]
435         self.run_index_tests_with_expressions(expressions)
436
437     def run_gte_tests_with_expressions(self, expressions):
438         # Here we don't test every before/after combination.
439         attrs = [x for x in self.users[0].keys() if x not in
440                  ('dn', 'objectclass')]
441         for expression in expressions:
442             for attr in attrs:
443                 gte_order, expected_order, gte_map = \
444                     self.get_gte_tests_and_order(attr, expression)
445                 # In case there is some order dependency, disorder tests
446                 gte_tests = gte_order[:]
447                 random.seed(2)
448                 random.shuffle(gte_tests)
449                 res = None
450                 sort_control = "server_sort:1:0:%s" % attr
451
452                 expected_order = self.get_expected_order(attr, expression)
453                 sort_control = "server_sort:1:0:%s" % attr
454                 res = None
455                 for before in range(0, 11):
456                     after = before
457                     for gte in gte_tests:
458                         if res is not None:
459                             cookie = get_cookie(res.controls)
460                         else:
461                             cookie = None
462                         vlv_search = encode_vlv_control(before=before,
463                                                         after=after,
464                                                         gte=gte,
465                                                         cookie=cookie)
466
467                         res = self.ldb.search(self.ou,
468                                               scope=ldb.SCOPE_ONELEVEL,
469                                               expression=expression,
470                                               attrs=[attr],
471                                               controls=[sort_control,
472                                                         vlv_search])
473
474                         results = [x[attr][0] for x in res]
475                         offset = gte_map.get(gte, len(expected_order))
476
477                         # here offset is 0-based
478                         start = max(offset - before, 0)
479                         end = offset + 1 + after
480
481                         expected_results = expected_order[start: end]
482
483                         self.assertEquals(expected_results, results)
484
485     def test_vlv_gte_with_expression(self):
486         """What happens when we run the VLV with an expression?"""
487         expressions = ["(objectClass=*)",
488                        "(cn=%s)" % self.users[-1]['cn'],
489                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
490                        ]
491         self.run_gte_tests_with_expressions(expressions)
492
493     def test_vlv_gte_with_failing_expression(self):
494         """What happens when we run the VLV on an expression that matches
495         nothing?"""
496         expressions = ["(samaccountname=testferf)",
497                        "(cn=hefalump)",
498                        ]
499         self.run_gte_tests_with_expressions(expressions)
500
501     def test_server_vlv_with_cookie_while_adding_and_deleting(self):
502         """What happens if we add or remove items in the middle of the VLV?
503
504         Nothing. The search and the sort is not repeated, and we only
505         deal with the objects originally found.
506         """
507         attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
508                           ('dn', 'objectclass')]
509         user_number = 0
510         iteration = 0
511         for attr in attrs:
512             full_results, controls, sort_control = \
513                             self.get_full_list(attr, True)
514             original_n = len(self.users)
515
516             expected_order = full_results
517             random.seed(1)
518
519             for before in range(0, 3) + [6, 11, 19]:
520                 for after in range(0, 3) + [6, 11, 19]:
521                     start = max(before - 1, 1)
522                     end = max(start + 4, original_n - after + 2)
523                     for offset in range(start, end):
524                         # if iteration > 2076:
525                         #    return
526                         cookie = get_cookie(controls, original_n)
527                         vlv_search = encode_vlv_control(before=before,
528                                                         after=after,
529                                                         offset=offset,
530                                                         n=original_n,
531                                                         cookie=cookie)
532
533                         iteration += 1
534                         res = self.ldb.search(self.ou,
535                                               scope=ldb.SCOPE_ONELEVEL,
536                                               attrs=[attr],
537                                               controls=[sort_control,
538                                                         vlv_search])
539
540                         controls = res.controls
541                         results = [x[attr][0] for x in res]
542                         real_offset = max(1, min(offset, len(expected_order)))
543
544                         expected_results = []
545                         skipped = 0
546                         begin_offset = max(real_offset - before - 1, 0)
547                         real_before = min(before, real_offset - 1)
548                         real_after = min(after,
549                                          len(expected_order) - real_offset)
550
551                         for x in expected_order[begin_offset:]:
552                             if x is not None:
553                                 expected_results.append(x[0])
554                                 if (len(expected_results) ==
555                                     real_before + real_after + 1):
556                                     break
557                             else:
558                                 skipped += 1
559
560                         if expected_results != results:
561                             print("attr %s before %d after %d offset %d" %
562                                    (attr, before, after, offset))
563                         self.assertEquals(expected_results, results)
564
565                         n = len(self.users)
566                         if random.random() < 0.1 + (n < 5) * 0.05:
567                             if n == 0:
568                                 i = 0
569                             else:
570                                 i = random.randrange(n)
571                             user = self.create_user(i, n, suffix='-%s' %
572                                                     user_number)
573                             user_number += 1
574                         if random.random() < 0.1  + (n > 50) * 0.02 and n:
575                             index = random.randrange(n)
576                             user = self.users.pop(index)
577
578                             self.ldb.delete(user['dn'])
579
580                             replaced = (user[attr], user['cn'])
581                             if replaced in expected_order:
582                                 i = expected_order.index(replaced)
583                                 expected_order[i] = None
584
585     def test_server_vlv_with_cookie_while_changing(self):
586         """What happens if we modify items in the middle of the VLV?
587
588         The expected behaviour (as found on Windows) is the sort is
589         not repeated, but the changes in attributes are reflected.
590         """
591         attrs = [x for x in self.users[0].keys() if x not in
592                  ('dn', 'objectclass', 'cn')]
593         for attr in attrs:
594             n_users = len(self.users)
595             expected_order = [x.upper() for x in self.get_expected_order(attr)]
596             sort_control = "server_sort:1:0:%s" % attr
597             res = None
598             i = 0
599
600             # First we'll fetch the whole list so we know the original
601             # sort order. This is necessary because we don't know how
602             # the server will order equivalent items. We are using the
603             # dn as a key.
604             half_n = n_users // 2
605             vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
606             res = self.ldb.search(self.ou,
607                                   scope=ldb.SCOPE_ONELEVEL,
608                                   attrs=['dn', attr],
609                                   controls=[sort_control, vlv_search])
610
611             results = [x[attr][0].upper() for x in res]
612             #self.assertEquals(expected_order, results)
613
614             dn_order = [str(x['dn']) for x in res]
615             values = results[:]
616
617             for before in range(0, 3):
618                 for after in range(0, 3):
619                     for offset in range(1 + before, n_users - after):
620                         cookie = get_cookie(res.controls, len(self.users))
621                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
622                                       (before, after, offset, len(self.users),
623                                        cookie))
624
625                         res = self.ldb.search(self.ou,
626                                               scope=ldb.SCOPE_ONELEVEL,
627                                               attrs=['dn', attr],
628                                               controls=[sort_control,
629                                                         vlv_search])
630
631                         dn_results = [str(x['dn']) for x in res]
632                         dn_expected = dn_order[offset - before - 1:
633                                                offset + after]
634
635                         self.assertEquals(dn_expected, dn_results)
636
637                         results = [x[attr][0].upper() for x in res]
638
639                         self.assertCorrectResults(results, values,
640                                                   offset, before, after)
641
642                         i += 1
643                         if i % 3 == 2:
644                             if (attr in self.locale_sorted_keys or
645                                 attr in self.binary_sorted_keys):
646                                 i1 = i % n_users
647                                 i2 = (i ^ 255) % n_users
648                                 dn1 = dn_order[i1]
649                                 dn2 = dn_order[i2]
650                                 v2 = values[i2]
651
652                                 if v2 in self.locale_sorted_keys:
653                                     v2 += '-%d' % i
654                                 cn1 = dn1.split(',', 1)[0][3:]
655                                 cn2 = dn2.split(',', 1)[0][3:]
656
657                                 values[i1] = v2
658
659                                 m = ldb.Message()
660                                 m.dn = ldb.Dn(self.ldb, dn1)
661                                 m[attr] = ldb.MessageElement(v2,
662                                                              ldb.FLAG_MOD_REPLACE,
663                                                              attr)
664
665                                 self.ldb.modify(m)
666
667     def test_server_vlv_fractions_with_cookie(self):
668         """What happens when the count is set to an arbitrary number?
669
670         In that case the offset and the count form a fraction, and the
671         VLV should be centred at a point offset/count of the way
672         through. For example, if offset is 3 and count is 6, the VLV
673         should be looking around halfway. The actual algorithm is a
674         bit fiddlier than that, because of the one-basedness of VLV.
675         """
676         attrs = [x for x in self.users[0].keys() if x not in
677                  ('dn', 'objectclass')]
678
679         n_users = len(self.users)
680
681         random.seed(4)
682
683         for attr in attrs:
684             full_results, controls, sort_control = self.get_full_list(attr)
685             self.assertEqual(len(full_results), n_users)
686             for before in range(0, 2):
687                 for after in range(0, 2):
688                     for denominator in range(1, 20):
689                         for offset in range(1, denominator + 3):
690                             cookie = get_cookie(controls, len(self.users))
691                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
692                                           (before, after, offset,
693                                            denominator,
694                                            cookie))
695                             try:
696                                 res = self.ldb.search(self.ou,
697                                                       scope=ldb.SCOPE_ONELEVEL,
698                                                       attrs=[attr],
699                                                       controls=[sort_control,
700                                                                 vlv_search])
701                             except ldb.LdbError as e:
702                                 if offset != 0:
703                                     raise
704                                 print("offset %d denominator %d raised error "
705                                        "expected error %s\n"
706                                        "(offset zero is illegal unless "
707                                        "content count is zero)" %
708                                        (offset, denominator, e))
709                                 continue
710
711                             results = [x[attr][0].lower() for x in res]
712
713                             if denominator == 0:
714                                 denominator = n_users
715                                 if offset == 0:
716                                     offset = denominator
717                             elif denominator == 1:
718                                 # the offset can only be 1, but the 1/1 case
719                                 # means something special
720                                 if offset == 1:
721                                     real_offset = n_users
722                                 else:
723                                     real_offset = 1
724                             else:
725                                 if offset > denominator:
726                                     offset = denominator
727                                 real_offset = (1 +
728                                                int(round((n_users - 1) *
729                                                          (offset - 1) /
730                                                          (denominator - 1.0)))
731                                                )
732
733                             self.assertCorrectResults(results, full_results,
734                                                       real_offset, before,
735                                                       after)
736
737                             controls = res.controls
738                             if False:
739                                 for c in list(controls):
740                                     cstr = str(c)
741                                     if cstr.startswith('vlv_resp'):
742                                         bits = cstr.rsplit(':')
743                                         print("the answer is %s; we said %d" %
744                                                (bits[2], real_offset))
745                                         break
746
747     def test_server_vlv_no_cookie(self):
748         attrs = [x for x in self.users[0].keys() if x not in
749                  ('dn', 'objectclass')]
750
751         for attr in attrs:
752             expected_order = self.get_expected_order(attr)
753             sort_control = "server_sort:1:0:%s" % attr
754             for before in range(0, 5):
755                 for after in range(0, 7):
756                     for offset in range(1 + before, len(self.users) - after):
757                         res = self.ldb.search(self.ou,
758                                               scope=ldb.SCOPE_ONELEVEL,
759                                               attrs=[attr],
760                                               controls=[sort_control,
761                                                         "vlv:1:%d:%d:%d:0" %
762                                                         (before, after,
763                                                          offset)])
764                         results = [x[attr][0] for x in res]
765                         self.assertCorrectResults(results, expected_order,
766                                                   offset, before, after)
767
768     def get_expected_order_showing_deleted(self, attr,
769                                            expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
770                                            base=None,
771                                            scope=ldb.SCOPE_SUBTREE
772                                            ):
773         """Fetch the whole list sorted on the attribute, using sort only,
774         searching in the entire tree, not just our OU. This is the
775         way to find deleted objects.
776         """
777         if base is None:
778             base = self.base_dn
779         sort_control = "server_sort:1:0:%s" % attr
780         controls = [sort_control, "show_deleted:1"]
781
782         res = self.ldb.search(base,
783                               scope=scope,
784                               expression=expression,
785                               attrs=[attr],
786                               controls=controls)
787         results = [x[attr][0] for x in res]
788         return results
789
790     def add_deleted_users(self, n):
791         deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
792                          for i in range(n)]
793
794         for user in deleted_users:
795             self.delete_user(user)
796
797     def test_server_vlv_no_cookie_show_deleted(self):
798         """What do we see with the show_deleted control?"""
799         attrs = ['objectGUID',
800                  'cn',
801                  'sAMAccountName',
802                  'objectSid',
803                  'name',
804                  'whenChanged',
805                  'usnChanged'
806                  ]
807
808         # add some deleted users first, just in case there are none
809         self.add_deleted_users(6)
810         random.seed(22)
811         expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
812
813         for attr in attrs:
814             show_deleted_control = "show_deleted:1"
815             expected_order = self.get_expected_order_showing_deleted(attr,
816                                                                      expression)
817             n = len(expected_order)
818             sort_control = "server_sort:1:0:%s" % attr
819             for before in [3, 1, 0]:
820                 for after in [0, 2]:
821                     # don't test every position, because there could be hundreds.
822                     # jump back and forth instead
823                     for i in range(20):
824                         offset = random.randrange(max(1, before - 2),
825                                                   min(n - after + 2, n))
826                         res = self.ldb.search(self.base_dn,
827                                               expression=expression,
828                                               scope=ldb.SCOPE_SUBTREE,
829                                               attrs=[attr],
830                                               controls=[sort_control,
831                                                         show_deleted_control,
832                                                         "vlv:1:%d:%d:%d:0" %
833                                                         (before, after,
834                                                          offset)
835                                                         ]
836                                               )
837                         results = [x[attr][0] for x in res]
838                         self.assertCorrectResults(results, expected_order,
839                                                   offset, before, after)
840
841     def test_server_vlv_no_cookie_show_deleted_only(self):
842         """What do we see with the show_deleted control when we're not looking
843         at any non-deleted things"""
844         attrs = ['objectGUID',
845                  'cn',
846                  'sAMAccountName',
847                  'objectSid',
848                  'whenChanged',
849                  ]
850
851         # add some deleted users first, just in case there are none
852         self.add_deleted_users(4)
853         base = 'CN=Deleted Objects,%s' % self.base_dn
854         expression = "(cn=vlv-deleted*)"
855         for attr in attrs:
856             show_deleted_control = "show_deleted:1"
857             expected_order = self.get_expected_order_showing_deleted(attr,
858                                                                      expression=expression,
859                                                                      base=base,
860                                                                      scope=ldb.SCOPE_ONELEVEL)
861             print("searching for attr %s amongst %d deleted objects" %
862                    (attr, len(expected_order)))
863             sort_control = "server_sort:1:0:%s" % attr
864             step = max(len(expected_order) // 10, 1)
865             for before in [3, 0]:
866                 for after in [0, 2]:
867                     for offset in range(1 + before,
868                                         len(expected_order) - after,
869                                         step):
870                         res = self.ldb.search(base,
871                                               expression=expression,
872                                               scope=ldb.SCOPE_ONELEVEL,
873                                               attrs=[attr],
874                                               controls=[sort_control,
875                                                         show_deleted_control,
876                                                         "vlv:1:%d:%d:%d:0" %
877                                                         (before, after,
878                                                          offset)])
879                         results = [x[attr][0] for x in res]
880                         self.assertCorrectResults(results, expected_order,
881                                                   offset, before, after)
882
883     def test_server_vlv_with_cookie_show_deleted(self):
884         """What do we see with the show_deleted control?"""
885         attrs = ['objectGUID',
886                  'cn',
887                  'sAMAccountName',
888                  'objectSid',
889                  'name',
890                  'whenChanged',
891                  'usnChanged'
892                  ]
893         self.add_deleted_users(6)
894         random.seed(23)
895         for attr in attrs:
896             expected_order = self.get_expected_order(attr)
897             sort_control = "server_sort:1:0:%s" % attr
898             res = None
899             show_deleted_control = "show_deleted:1"
900             expected_order = self.get_expected_order_showing_deleted(attr)
901             n = len(expected_order)
902             expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
903             for before in [3, 2, 1, 0]:
904                 after = before
905                 for i in range(20):
906                     offset = random.randrange(max(1, before - 2),
907                                               min(n - after + 2, n))
908                     if res is None:
909                         vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
910                                                            offset)
911                     else:
912                         cookie = get_cookie(res.controls, n)
913                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
914                                       (before, after, offset, n,
915                                        cookie))
916
917                     res = self.ldb.search(self.base_dn,
918                                           expression=expression,
919                                           scope=ldb.SCOPE_SUBTREE,
920                                           attrs=[attr],
921                                           controls=[sort_control,
922                                                     vlv_search,
923                                                     show_deleted_control])
924
925                     results = [x[attr][0] for x in res]
926
927                     self.assertCorrectResults(results, expected_order,
928                                               offset, before, after)
929
930     def test_server_vlv_gte_with_cookie(self):
931         attrs = [x for x in self.users[0].keys() if x not in
932                  ('dn', 'objectclass')]
933         for attr in attrs:
934             gte_order, expected_order, gte_map = \
935                                         self.get_gte_tests_and_order(attr)
936             # In case there is some order dependency, disorder tests
937             gte_tests = gte_order[:]
938             random.seed(1)
939             random.shuffle(gte_tests)
940             res = None
941             sort_control = "server_sort:1:0:%s" % attr
942             for before in [0, 1, 2, 4]:
943                 for after in [0, 1, 3, 6]:
944                     for gte in gte_tests:
945                         if res is not None:
946                             cookie = get_cookie(res.controls, len(self.users))
947                         else:
948                             cookie = None
949                         vlv_search = encode_vlv_control(before=before,
950                                                         after=after,
951                                                         gte=gte,
952                                                         cookie=cookie)
953
954                         res = self.ldb.search(self.ou,
955                                               scope=ldb.SCOPE_ONELEVEL,
956                                               attrs=[attr],
957                                               controls=[sort_control,
958                                                         vlv_search])
959
960                         results = [x[attr][0] for x in res]
961                         offset = gte_map.get(gte, len(expected_order))
962
963                         # here offset is 0-based
964                         start = max(offset - before, 0)
965                         end = offset + 1 + after
966
967                         expected_results = expected_order[start: end]
968
969                         self.assertEquals(expected_results, results)
970
971     def test_server_vlv_gte_no_cookie(self):
972         attrs = [x for x in self.users[0].keys() if x not in
973                  ('dn', 'objectclass')]
974         iteration = 0
975         for attr in attrs:
976             gte_order, expected_order, gte_map = \
977                                         self.get_gte_tests_and_order(attr)
978             # In case there is some order dependency, disorder tests
979             gte_tests = gte_order[:]
980             random.seed(1)
981             random.shuffle(gte_tests)
982
983             sort_control = "server_sort:1:0:%s" % attr
984             for before in [0, 1, 3]:
985                 for after in [0, 4]:
986                     for gte in gte_tests:
987                         vlv_search = encode_vlv_control(before=before,
988                                                         after=after,
989                                                         gte=gte)
990
991                         res = self.ldb.search(self.ou,
992                                               scope=ldb.SCOPE_ONELEVEL,
993                                               attrs=[attr],
994                                               controls=[sort_control,
995                                                         vlv_search])
996                         results = [x[attr][0] for x in res]
997
998                         # here offset is 0-based
999                         offset = gte_map.get(gte, len(expected_order))
1000                         start = max(offset - before, 0)
1001                         end = offset + after + 1
1002                         expected_results = expected_order[start: end]
1003                         iteration += 1
1004                         if expected_results != results:
1005                             middle = expected_order[len(expected_order) // 2]
1006                             print(expected_results, results)
1007                             print(middle)
1008                             print(expected_order)
1009                             print()
1010                             print("\nattr %s offset %d before %d "
1011                                    "after %d gte %s" %
1012                                    (attr, offset, before, after, gte))
1013                         self.assertEquals(expected_results, results)
1014
1015     def test_multiple_searches(self):
1016         """The maximum number of concurrent vlv searches per connection is
1017         currently set at 3. That means if you open 4 VLV searches the
1018         cookie on the first one should fail.
1019         """
1020         # Windows has a limit of 10 VLVs where there are low numbers
1021         # of objects in each search.
1022         attrs = ([x for x in self.users[0].keys() if x not in
1023                   ('dn', 'objectclass')] * 2)[:12]
1024
1025         vlv_cookies = []
1026         for attr in attrs:
1027             sort_control = "server_sort:1:0:%s" % attr
1028
1029             res = self.ldb.search(self.ou,
1030                                   scope=ldb.SCOPE_ONELEVEL,
1031                                   attrs=[attr],
1032                                   controls=[sort_control,
1033                                             "vlv:1:1:1:1:0"])
1034
1035             cookie = get_cookie(res.controls, len(self.users))
1036             vlv_cookies.append(cookie)
1037             time.sleep(0.2)
1038
1039         # now this one should fail
1040         self.assertRaises(ldb.LdbError,
1041                           self.ldb.search,
1042                           self.ou,
1043                           scope=ldb.SCOPE_ONELEVEL,
1044                           attrs=[attr],
1045                           controls=[sort_control,
1046                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1047
1048         # and this one should succeed
1049         res = self.ldb.search(self.ou,
1050                               scope=ldb.SCOPE_ONELEVEL,
1051                               attrs=[attr],
1052                               controls=[sort_control,
1053                                         "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1054
1055         # this one should fail because it is a new connection and
1056         # doesn't share cookies
1057         new_ldb = SamDB(host, credentials=creds,
1058                         session_info=system_session(lp), lp=lp)
1059
1060         self.assertRaises(ldb.LdbError,
1061                           new_ldb.search, self.ou,
1062                           scope=ldb.SCOPE_ONELEVEL,
1063                           attrs=[attr],
1064                           controls=[sort_control,
1065                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1066
1067         # but now without the critical flag it just does no VLV.
1068         new_ldb.search(self.ou,
1069                        scope=ldb.SCOPE_ONELEVEL,
1070                        attrs=[attr],
1071                        controls=[sort_control,
1072                                  "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1073
1074
1075 if "://" not in host:
1076     if os.path.isfile(host):
1077         host = "tdb://%s" % host
1078     else:
1079         host = "ldap://%s" % host
1080
1081
1082 TestProgram(module=__name__, opts=subunitopts)