PY3: change shebang to python3 in source4/dsdb dir
[amitay/samba.git] / source4 / dsdb / tests / python / vlv.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import base64
9 import random
10 import re
11
12 sys.path.insert(0, "bin/python")
13 import samba
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 import ldb
20 from samba.samdb import SamDB
21 from samba.compat import get_bytes
22 from samba.compat import get_string
23
24 import time
25
26 parser = optparse.OptionParser("vlv.py [options] <host>")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
29 parser.add_option_group(options.VersionOptions(parser))
30 # use command line creds if available
31 credopts = options.CredentialsOptions(parser)
32 parser.add_option_group(credopts)
33 subunitopts = SubunitOptions(parser)
34 parser.add_option_group(subunitopts)
35
36 parser.add_option('--elements', type='int', default=20,
37                   help="use this many elements in the tests")
38
39 parser.add_option('--delete-in-setup', action='store_true',
40                   help="cleanup in next setup rather than teardown")
41
42 parser.add_option('--skip-attr-regex',
43                   help="ignore attributes matching this regex")
44
45 opts, args = parser.parse_args()
46
47 if len(args) < 1:
48     parser.print_usage()
49     sys.exit(1)
50
51 host = args[0]
52
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
55
56 N_ELEMENTS = opts.elements
57
58
59 class VlvTestException(Exception):
60     pass
61
62
63 def encode_vlv_control(critical=1,
64                        before=0, after=0,
65                        offset=None,
66                        gte=None,
67                        n=0, cookie=None):
68
69     s = "vlv:%d:%d:%d:" % (critical, before, after)
70
71     if offset is not None:
72         m = "%d:%d" % (offset, n)
73     elif b':' in gte or b'\x00' in gte:
74         gte = get_string(base64.b64encode(gte))
75         m = "base64>=%s" % gte
76     else:
77         m = ">=%s" % get_string(gte)
78
79     if cookie is None:
80         return s + m
81
82     return s + m + ':' + cookie
83
84
85 def get_cookie(controls, expected_n=None):
86     """Get the cookie, STILL base64 encoded, or raise ValueError."""
87     for c in list(controls):
88         cstr = str(c)
89         if cstr.startswith('vlv_resp'):
90             head, n, _, cookie = cstr.rsplit(':', 3)
91             if expected_n is not None and int(n) != expected_n:
92                 raise ValueError("Expected %s items, server said %s" %
93                                  (expected_n, n))
94             return cookie
95     raise ValueError("there is no VLV response")
96
97
98 class VLVTests(samba.tests.TestCase):
99
100     def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
101         name = "%s%d%s" % (prefix, i, suffix)
102         user = {
103             'cn': name,
104             "objectclass": "user",
105             'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
106             "roomNumber": "%sbc" % (n - i),
107             "carLicense": "后来经",
108             "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
109             "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
110             "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
111             "flags": str(i * (n - i)),
112             "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
113                                             ' 3z}'[i & 3],
114                                             '"@'[i & 1],),
115         }
116
117         # _user_broken_attrs tests are broken due to problems outside
118         # of VLV.
119         _user_broken_attrs = {
120             # Sort doesn't look past a NUL byte.
121             "photo": "\x00%d" % (n - i),
122             "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
123                                                              chr(i & 255), i),
124             "displayNamePrintable": "%d\x00%c" % (i, i & 255),
125             "adminDisplayName": "%d\x00b" % (n - i),
126             "title": "%d%sb" % (n - i, '\x00' * i),
127             "comment": "Favourite colour is %d" % (n % (i + 1)),
128
129             # Names that vary only in case. Windows returns
130             # equivalent addresses in the order they were put
131             # in ('a st', 'A st',...).
132             "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
133         }
134
135         if attrs is not None:
136             user.update(attrs)
137
138         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
139
140         if opts.skip_attr_regex:
141             match = re.compile(opts.skip_attr_regex).search
142             for k in user.keys():
143                 if match(k):
144                     del user[k]
145
146         self.users.append(user)
147         self.ldb.add(user)
148         return user
149
150     def setUp(self):
151         super(VLVTests, self).setUp()
152         self.ldb = SamDB(host, credentials=creds,
153                          session_info=system_session(lp), lp=lp)
154
155         self.base_dn = self.ldb.domain_dn()
156         self.ou = "ou=vlv,%s" % self.base_dn
157         if opts.delete_in_setup:
158             try:
159                 self.ldb.delete(self.ou, ['tree_delete:1'])
160             except ldb.LdbError as e:
161                 print("tried deleting %s, got error %s" % (self.ou, e))
162         self.ldb.add({
163             "dn": self.ou,
164             "objectclass": "organizationalUnit"})
165
166         self.users = []
167         for i in range(N_ELEMENTS):
168             self.create_user(i, N_ELEMENTS)
169
170         attrs = self.users[0].keys()
171         self.binary_sorted_keys = ['audio',
172                                    'photo',
173                                    "msTSExpireDate4",
174                                    'serialNumber',
175                                    "displayNamePrintable"]
176
177         self.numeric_sorted_keys = ['flags',
178                                     'accountExpires']
179
180         self.timestamp_keys = ['msTSExpireDate4']
181
182         self.int64_keys = set(['accountExpires'])
183
184         self.locale_sorted_keys = [x for x in attrs if
185                                    x not in (self.binary_sorted_keys +
186                                              self.numeric_sorted_keys)]
187
188         # don't try spaces, etc in cn
189         self.delicate_keys = ['cn']
190
191     def tearDown(self):
192         super(VLVTests, self).tearDown()
193         if not opts.delete_in_setup:
194             self.ldb.delete(self.ou, ['tree_delete:1'])
195
196     def get_full_list(self, attr, include_cn=False):
197         """Fetch the whole list sorted on the attribute, using the VLV.
198         This way you get a VLV cookie."""
199         n_users = len(self.users)
200         sort_control = "server_sort:1:0:%s" % attr
201         half_n = n_users // 2
202         vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
203         attrs = [attr]
204         if include_cn:
205             attrs.append('cn')
206         res = self.ldb.search(self.ou,
207                               scope=ldb.SCOPE_ONELEVEL,
208                               attrs=attrs,
209                               controls=[sort_control,
210                                         vlv_search])
211         if include_cn:
212             full_results = [(str(x[attr][0]), str(x['cn'][0])) for x in res]
213         else:
214             full_results = [str(x[attr][0]).lower() for x in res]
215         controls = res.controls
216         return full_results, controls, sort_control
217
218     def get_expected_order(self, attr, expression=None):
219         """Fetch the whole list sorted on the attribute, using sort only."""
220         sort_control = "server_sort:1:0:%s" % attr
221         res = self.ldb.search(self.ou,
222                               scope=ldb.SCOPE_ONELEVEL,
223                               expression=expression,
224                               attrs=[attr],
225                               controls=[sort_control])
226         results = [x[attr][0] for x in res]
227         return results
228
229     def delete_user(self, user):
230         self.ldb.delete(user['dn'])
231         del self.users[self.users.index(user)]
232
233     def get_gte_tests_and_order(self, attr, expression=None):
234         expected_order = self.get_expected_order(attr, expression=expression)
235         gte_users = []
236         if attr in self.delicate_keys:
237             gte_keys = [
238                 '3',
239                 'abc',
240                 '¹',
241                 'ŋđ¼³ŧ“«đð',
242                 '桑巴',
243             ]
244         elif attr in self.timestamp_keys:
245             gte_keys = [
246                 '18560101010000.0Z',
247                 '19140103010000.0Z',
248                 '19560101010010.0Z',
249                 '19700101000000.0Z',
250                 '19991231211234.3Z',
251                 '20061111211234.0Z',
252                 '20390901041234.0Z',
253                 '25560101010000.0Z',
254             ]
255         elif attr not in self.numeric_sorted_keys:
256             gte_keys = [
257                 '3',
258                 'abc',
259                 ' ',
260                 '!@#!@#!',
261                 'kōkako',
262                 '¹',
263                 'ŋđ¼³ŧ“«đð',
264                 '\n\t\t',
265                 '桑巴',
266                 'zzzz',
267             ]
268             if expected_order:
269                 gte_keys.append(expected_order[len(expected_order) // 2] + b' tail')
270
271         else:
272             # "numeric" means positive integers
273             # doesn't work with -1, 3.14, ' 3', '9' * 20
274             gte_keys = ['3',
275                         '1' * 10,
276                         '1',
277                         '9' * 7,
278                         '0']
279
280             if attr in self.int64_keys:
281                 gte_keys += ['3' * 12, '71' * 8]
282
283         for i, x in enumerate(gte_keys):
284             user = self.create_user(i, N_ELEMENTS,
285                                     prefix='gte',
286                                     attrs={attr: x})
287             gte_users.append(user)
288
289         gte_order = self.get_expected_order(attr)
290         for user in gte_users:
291             self.delete_user(user)
292
293         # for sanity's sake
294         expected_order_2 = self.get_expected_order(attr, expression=expression)
295         self.assertEqual(expected_order, expected_order_2)
296
297         # Map gte tests to indexes in expected order. This will break
298         # if gte_order and expected_order are differently ordered (as
299         # it should).
300         gte_map = {}
301
302         # index to the first one with each value
303         index_map = {}
304         for i, k in enumerate(expected_order):
305             if k not in index_map:
306                 index_map[k] = i
307
308         keys = []
309         for k in gte_order:
310             if k in index_map:
311                 i = index_map[k]
312                 gte_map[k] = i
313                 for k in keys:
314                     gte_map[k] = i
315                 keys = []
316             else:
317                 keys.append(k)
318
319         for k in keys:
320             gte_map[k] = len(expected_order)
321
322         if False:
323             print("gte_map:")
324             for k in gte_order:
325                 print("   %10s => %10s" % (k, gte_map[k]))
326
327         return gte_order, expected_order, gte_map
328
329     def assertCorrectResults(self, results, expected_order,
330                              offset, before, after):
331         """A helper to calculate offsets correctly and say as much as possible
332         when something goes wrong."""
333
334         start = max(offset - before - 1, 0)
335         end = offset + after
336         expected_results = expected_order[start: end]
337
338         # if it is a tuple with the cn, drop the cn
339         if expected_results and isinstance(expected_results[0], tuple):
340             expected_results = [x[0] for x in expected_results]
341
342         if expected_results == results:
343             return
344
345         if expected_order is not None:
346             print("expected order: %s" % expected_order[:20])
347             if len(expected_order) > 20:
348                 print("... and %d more not shown" % (len(expected_order) - 20))
349
350         print("offset %d before %d after %d" % (offset, before, after))
351         print("start %d end %d" % (start, end))
352         print("expected: %s" % expected_results)
353         print("got     : %s" % results)
354         self.assertEquals(expected_results, results)
355
356     def test_server_vlv_with_cookie(self):
357         attrs = [x for x in self.users[0].keys() if x not in
358                  ('dn', 'objectclass')]
359         for attr in attrs:
360             expected_order = self.get_expected_order(attr)
361             sort_control = "server_sort:1:0:%s" % attr
362             res = None
363             n = len(self.users)
364             for before in [10, 0, 3, 1, 4, 5, 2]:
365                 for after in [0, 3, 1, 4, 5, 2, 7]:
366                     for offset in range(max(1, before - 2),
367                                         min(n - after + 2, n)):
368                         if res is None:
369                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
370                                                                offset)
371                         else:
372                             cookie = get_cookie(res.controls, n)
373                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
374                                           (before, after, offset, n,
375                                            cookie))
376
377                         res = self.ldb.search(self.ou,
378                                               scope=ldb.SCOPE_ONELEVEL,
379                                               attrs=[attr],
380                                               controls=[sort_control,
381                                                         vlv_search])
382
383                         results = [x[attr][0] for x in res]
384
385                         self.assertCorrectResults(results, expected_order,
386                                                   offset, before, after)
387
388     def run_index_tests_with_expressions(self, expressions):
389         # Here we don't test every before/after combination.
390         attrs = [x for x in self.users[0].keys() if x not in
391                  ('dn', 'objectclass')]
392         for attr in attrs:
393             for expression in expressions:
394                 expected_order = self.get_expected_order(attr, expression)
395                 sort_control = "server_sort:1:0:%s" % attr
396                 res = None
397                 n = len(expected_order)
398                 for before in range(0, 11):
399                     after = before
400                     for offset in range(max(1, before - 2),
401                                         min(n - after + 2, n)):
402                         if res is None:
403                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
404                                                                offset)
405                         else:
406                             cookie = get_cookie(res.controls)
407                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
408                                           (before, after, offset, n,
409                                            cookie))
410
411                         res = self.ldb.search(self.ou,
412                                               expression=expression,
413                                               scope=ldb.SCOPE_ONELEVEL,
414                                               attrs=[attr],
415                                               controls=[sort_control,
416                                                         vlv_search])
417
418                         results = [x[attr][0] for x in res]
419
420                         self.assertCorrectResults(results, expected_order,
421                                                   offset, before, after)
422
423     def test_server_vlv_with_expression(self):
424         """What happens when we run the VLV with an expression?"""
425         expressions = ["(objectClass=*)",
426                        "(cn=%s)" % self.users[-1]['cn'],
427                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
428                        ]
429         self.run_index_tests_with_expressions(expressions)
430
431     def test_server_vlv_with_failing_expression(self):
432         """What happens when we run the VLV on an expression that matches
433         nothing?"""
434         expressions = ["(samaccountname=testferf)",
435                        "(cn=hefalump)",
436                        ]
437         self.run_index_tests_with_expressions(expressions)
438
439     def run_gte_tests_with_expressions(self, expressions):
440         # Here we don't test every before/after combination.
441         attrs = [x for x in self.users[0].keys() if x not in
442                  ('dn', 'objectclass')]
443         for expression in expressions:
444             for attr in attrs:
445                 gte_order, expected_order, gte_map = \
446                     self.get_gte_tests_and_order(attr, expression)
447                 # In case there is some order dependency, disorder tests
448                 gte_tests = gte_order[:]
449                 random.seed(2)
450                 random.shuffle(gte_tests)
451                 res = None
452                 sort_control = "server_sort:1:0:%s" % attr
453
454                 expected_order = self.get_expected_order(attr, expression)
455                 sort_control = "server_sort:1:0:%s" % attr
456                 res = None
457                 for before in range(0, 11):
458                     after = before
459                     for gte in gte_tests:
460                         if res is not None:
461                             cookie = get_cookie(res.controls)
462                         else:
463                             cookie = None
464                         vlv_search = encode_vlv_control(before=before,
465                                                         after=after,
466                                                         gte=get_bytes(gte),
467                                                         cookie=cookie)
468
469                         res = self.ldb.search(self.ou,
470                                               scope=ldb.SCOPE_ONELEVEL,
471                                               expression=expression,
472                                               attrs=[attr],
473                                               controls=[sort_control,
474                                                         vlv_search])
475
476                         results = [x[attr][0] for x in res]
477                         offset = gte_map.get(gte, len(expected_order))
478
479                         # here offset is 0-based
480                         start = max(offset - before, 0)
481                         end = offset + 1 + after
482
483                         expected_results = expected_order[start: end]
484
485                         self.assertEquals(expected_results, results)
486
487     def test_vlv_gte_with_expression(self):
488         """What happens when we run the VLV with an expression?"""
489         expressions = ["(objectClass=*)",
490                        "(cn=%s)" % self.users[-1]['cn'],
491                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
492                        ]
493         self.run_gte_tests_with_expressions(expressions)
494
495     def test_vlv_gte_with_failing_expression(self):
496         """What happens when we run the VLV on an expression that matches
497         nothing?"""
498         expressions = ["(samaccountname=testferf)",
499                        "(cn=hefalump)",
500                        ]
501         self.run_gte_tests_with_expressions(expressions)
502
503     def test_server_vlv_with_cookie_while_adding_and_deleting(self):
504         """What happens if we add or remove items in the middle of the VLV?
505
506         Nothing. The search and the sort is not repeated, and we only
507         deal with the objects originally found.
508         """
509         attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
510                           ('dn', 'objectclass')]
511         user_number = 0
512         iteration = 0
513         for attr in attrs:
514             full_results, controls, sort_control = \
515                             self.get_full_list(attr, True)
516             original_n = len(self.users)
517
518             expected_order = full_results
519             random.seed(1)
520
521             for before in list(range(0, 3)) + [6, 11, 19]:
522                 for after in list(range(0, 3)) + [6, 11, 19]:
523                     start = max(before - 1, 1)
524                     end = max(start + 4, original_n - after + 2)
525                     for offset in range(start, end):
526                         # if iteration > 2076:
527                         #    return
528                         cookie = get_cookie(controls, original_n)
529                         vlv_search = encode_vlv_control(before=before,
530                                                         after=after,
531                                                         offset=offset,
532                                                         n=original_n,
533                                                         cookie=cookie)
534
535                         iteration += 1
536                         res = self.ldb.search(self.ou,
537                                               scope=ldb.SCOPE_ONELEVEL,
538                                               attrs=[attr],
539                                               controls=[sort_control,
540                                                         vlv_search])
541
542                         controls = res.controls
543                         results = [x[attr][0] for x in res]
544                         real_offset = max(1, min(offset, len(expected_order)))
545
546                         expected_results = []
547                         skipped = 0
548                         begin_offset = max(real_offset - before - 1, 0)
549                         real_before = min(before, real_offset - 1)
550                         real_after = min(after,
551                                          len(expected_order) - real_offset)
552
553                         for x in expected_order[begin_offset:]:
554                             if x is not None:
555                                 expected_results.append(get_bytes(x[0]))
556                                 if (len(expected_results) ==
557                                     real_before + real_after + 1):
558                                     break
559                             else:
560                                 skipped += 1
561
562                         if expected_results != results:
563                             print("attr %s before %d after %d offset %d" %
564                                   (attr, before, after, offset))
565                         self.assertEquals(expected_results, results)
566
567                         n = len(self.users)
568                         if random.random() < 0.1 + (n < 5) * 0.05:
569                             if n == 0:
570                                 i = 0
571                             else:
572                                 i = random.randrange(n)
573                             user = self.create_user(i, n, suffix='-%s' %
574                                                     user_number)
575                             user_number += 1
576                         if random.random() < 0.1  + (n > 50) * 0.02 and n:
577                             index = random.randrange(n)
578                             user = self.users.pop(index)
579
580                             self.ldb.delete(user['dn'])
581
582                             replaced = (user[attr], user['cn'])
583                             if replaced in expected_order:
584                                 i = expected_order.index(replaced)
585                                 expected_order[i] = None
586
587     def test_server_vlv_with_cookie_while_changing(self):
588         """What happens if we modify items in the middle of the VLV?
589
590         The expected behaviour (as found on Windows) is the sort is
591         not repeated, but the changes in attributes are reflected.
592         """
593         attrs = [x for x in self.users[0].keys() if x not in
594                  ('dn', 'objectclass', 'cn')]
595         for attr in attrs:
596             n_users = len(self.users)
597             expected_order = [x.upper() for x in self.get_expected_order(attr)]
598             sort_control = "server_sort:1:0:%s" % attr
599             res = None
600             i = 0
601
602             # First we'll fetch the whole list so we know the original
603             # sort order. This is necessary because we don't know how
604             # the server will order equivalent items. We are using the
605             # dn as a key.
606             half_n = n_users // 2
607             vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
608             res = self.ldb.search(self.ou,
609                                   scope=ldb.SCOPE_ONELEVEL,
610                                   attrs=['dn', attr],
611                                   controls=[sort_control, vlv_search])
612
613             results = [x[attr][0].upper() for x in res]
614             #self.assertEquals(expected_order, results)
615
616             dn_order = [str(x['dn']) for x in res]
617             values = results[:]
618
619             for before in range(0, 3):
620                 for after in range(0, 3):
621                     for offset in range(1 + before, n_users - after):
622                         cookie = get_cookie(res.controls, len(self.users))
623                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
624                                       (before, after, offset, len(self.users),
625                                        cookie))
626
627                         res = self.ldb.search(self.ou,
628                                               scope=ldb.SCOPE_ONELEVEL,
629                                               attrs=['dn', attr],
630                                               controls=[sort_control,
631                                                         vlv_search])
632
633                         dn_results = [str(x['dn']) for x in res]
634                         dn_expected = dn_order[offset - before - 1:
635                                                offset + after]
636
637                         self.assertEquals(dn_expected, dn_results)
638
639                         results = [x[attr][0].upper() for x in res]
640
641                         self.assertCorrectResults(results, values,
642                                                   offset, before, after)
643
644                         i += 1
645                         if i % 3 == 2:
646                             if (attr in self.locale_sorted_keys or
647                                 attr in self.binary_sorted_keys):
648                                 i1 = i % n_users
649                                 i2 = (i ^ 255) % n_users
650                                 dn1 = dn_order[i1]
651                                 dn2 = dn_order[i2]
652                                 v2 = values[i2]
653
654                                 if v2 in self.locale_sorted_keys:
655                                     v2 += '-%d' % i
656                                 cn1 = dn1.split(',', 1)[0][3:]
657                                 cn2 = dn2.split(',', 1)[0][3:]
658
659                                 values[i1] = v2
660
661                                 m = ldb.Message()
662                                 m.dn = ldb.Dn(self.ldb, dn1)
663                                 m[attr] = ldb.MessageElement(v2,
664                                                              ldb.FLAG_MOD_REPLACE,
665                                                              attr)
666
667                                 self.ldb.modify(m)
668
669     def test_server_vlv_fractions_with_cookie(self):
670         """What happens when the count is set to an arbitrary number?
671
672         In that case the offset and the count form a fraction, and the
673         VLV should be centred at a point offset/count of the way
674         through. For example, if offset is 3 and count is 6, the VLV
675         should be looking around halfway. The actual algorithm is a
676         bit fiddlier than that, because of the one-basedness of VLV.
677         """
678         attrs = [x for x in self.users[0].keys() if x not in
679                  ('dn', 'objectclass')]
680
681         n_users = len(self.users)
682
683         random.seed(4)
684
685         for attr in attrs:
686             full_results, controls, sort_control = self.get_full_list(attr)
687             self.assertEqual(len(full_results), n_users)
688             for before in range(0, 2):
689                 for after in range(0, 2):
690                     for denominator in range(1, 20):
691                         for offset in range(1, denominator + 3):
692                             cookie = get_cookie(controls, len(self.users))
693                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
694                                           (before, after, offset,
695                                            denominator,
696                                            cookie))
697                             try:
698                                 res = self.ldb.search(self.ou,
699                                                       scope=ldb.SCOPE_ONELEVEL,
700                                                       attrs=[attr],
701                                                       controls=[sort_control,
702                                                                 vlv_search])
703                             except ldb.LdbError as e:
704                                 if offset != 0:
705                                     raise
706                                 print("offset %d denominator %d raised error "
707                                       "expected error %s\n"
708                                       "(offset zero is illegal unless "
709                                       "content count is zero)" %
710                                       (offset, denominator, e))
711                                 continue
712
713                             results = [str(x[attr][0]).lower() for x in res]
714
715                             if denominator == 0:
716                                 denominator = n_users
717                                 if offset == 0:
718                                     offset = denominator
719                             elif denominator == 1:
720                                 # the offset can only be 1, but the 1/1 case
721                                 # means something special
722                                 if offset == 1:
723                                     real_offset = n_users
724                                 else:
725                                     real_offset = 1
726                             else:
727                                 if offset > denominator:
728                                     offset = denominator
729                                 real_offset = (1 +
730                                                int(round((n_users - 1) *
731                                                          (offset - 1) /
732                                                          (denominator - 1.0)))
733                                                )
734
735                             self.assertCorrectResults(results, full_results,
736                                                       real_offset, before,
737                                                       after)
738
739                             controls = res.controls
740                             if False:
741                                 for c in list(controls):
742                                     cstr = str(c)
743                                     if cstr.startswith('vlv_resp'):
744                                         bits = cstr.rsplit(':')
745                                         print("the answer is %s; we said %d" %
746                                               (bits[2], real_offset))
747                                         break
748
749     def test_server_vlv_no_cookie(self):
750         attrs = [x for x in self.users[0].keys() if x not in
751                  ('dn', 'objectclass')]
752
753         for attr in attrs:
754             expected_order = self.get_expected_order(attr)
755             sort_control = "server_sort:1:0:%s" % attr
756             for before in range(0, 5):
757                 for after in range(0, 7):
758                     for offset in range(1 + before, len(self.users) - after):
759                         res = self.ldb.search(self.ou,
760                                               scope=ldb.SCOPE_ONELEVEL,
761                                               attrs=[attr],
762                                               controls=[sort_control,
763                                                         "vlv:1:%d:%d:%d:0" %
764                                                         (before, after,
765                                                          offset)])
766                         results = [x[attr][0] for x in res]
767                         self.assertCorrectResults(results, expected_order,
768                                                   offset, before, after)
769
770     def get_expected_order_showing_deleted(self, attr,
771                                            expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
772                                            base=None,
773                                            scope=ldb.SCOPE_SUBTREE
774                                            ):
775         """Fetch the whole list sorted on the attribute, using sort only,
776         searching in the entire tree, not just our OU. This is the
777         way to find deleted objects.
778         """
779         if base is None:
780             base = self.base_dn
781         sort_control = "server_sort:1:0:%s" % attr
782         controls = [sort_control, "show_deleted:1"]
783
784         res = self.ldb.search(base,
785                               scope=scope,
786                               expression=expression,
787                               attrs=[attr],
788                               controls=controls)
789         results = [x[attr][0] for x in res]
790         return results
791
792     def add_deleted_users(self, n):
793         deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
794                          for i in range(n)]
795
796         for user in deleted_users:
797             self.delete_user(user)
798
799     def test_server_vlv_no_cookie_show_deleted(self):
800         """What do we see with the show_deleted control?"""
801         attrs = ['objectGUID',
802                  'cn',
803                  'sAMAccountName',
804                  'objectSid',
805                  'name',
806                  'whenChanged',
807                  'usnChanged'
808                  ]
809
810         # add some deleted users first, just in case there are none
811         self.add_deleted_users(6)
812         random.seed(22)
813         expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
814
815         for attr in attrs:
816             show_deleted_control = "show_deleted:1"
817             expected_order = self.get_expected_order_showing_deleted(attr,
818                                                                      expression)
819             n = len(expected_order)
820             sort_control = "server_sort:1:0:%s" % attr
821             for before in [3, 1, 0]:
822                 for after in [0, 2]:
823                     # don't test every position, because there could be hundreds.
824                     # jump back and forth instead
825                     for i in range(20):
826                         offset = random.randrange(max(1, before - 2),
827                                                   min(n - after + 2, n))
828                         res = self.ldb.search(self.base_dn,
829                                               expression=expression,
830                                               scope=ldb.SCOPE_SUBTREE,
831                                               attrs=[attr],
832                                               controls=[sort_control,
833                                                         show_deleted_control,
834                                                         "vlv:1:%d:%d:%d:0" %
835                                                         (before, after,
836                                                          offset)
837                                                         ]
838                                               )
839                         results = [x[attr][0] for x in res]
840                         self.assertCorrectResults(results, expected_order,
841                                                   offset, before, after)
842
843     def test_server_vlv_no_cookie_show_deleted_only(self):
844         """What do we see with the show_deleted control when we're not looking
845         at any non-deleted things"""
846         attrs = ['objectGUID',
847                  'cn',
848                  'sAMAccountName',
849                  'objectSid',
850                  'whenChanged',
851                  ]
852
853         # add some deleted users first, just in case there are none
854         self.add_deleted_users(4)
855         base = 'CN=Deleted Objects,%s' % self.base_dn
856         expression = "(cn=vlv-deleted*)"
857         for attr in attrs:
858             show_deleted_control = "show_deleted:1"
859             expected_order = self.get_expected_order_showing_deleted(attr,
860                                                                      expression=expression,
861                                                                      base=base,
862                                                                      scope=ldb.SCOPE_ONELEVEL)
863             print("searching for attr %s amongst %d deleted objects" %
864                   (attr, len(expected_order)))
865             sort_control = "server_sort:1:0:%s" % attr
866             step = max(len(expected_order) // 10, 1)
867             for before in [3, 0]:
868                 for after in [0, 2]:
869                     for offset in range(1 + before,
870                                         len(expected_order) - after,
871                                         step):
872                         res = self.ldb.search(base,
873                                               expression=expression,
874                                               scope=ldb.SCOPE_ONELEVEL,
875                                               attrs=[attr],
876                                               controls=[sort_control,
877                                                         show_deleted_control,
878                                                         "vlv:1:%d:%d:%d:0" %
879                                                         (before, after,
880                                                          offset)])
881                         results = [x[attr][0] for x in res]
882                         self.assertCorrectResults(results, expected_order,
883                                                   offset, before, after)
884
885     def test_server_vlv_with_cookie_show_deleted(self):
886         """What do we see with the show_deleted control?"""
887         attrs = ['objectGUID',
888                  'cn',
889                  'sAMAccountName',
890                  'objectSid',
891                  'name',
892                  'whenChanged',
893                  'usnChanged'
894                  ]
895         self.add_deleted_users(6)
896         random.seed(23)
897         for attr in attrs:
898             expected_order = self.get_expected_order(attr)
899             sort_control = "server_sort:1:0:%s" % attr
900             res = None
901             show_deleted_control = "show_deleted:1"
902             expected_order = self.get_expected_order_showing_deleted(attr)
903             n = len(expected_order)
904             expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
905             for before in [3, 2, 1, 0]:
906                 after = before
907                 for i in range(20):
908                     offset = random.randrange(max(1, before - 2),
909                                               min(n - after + 2, n))
910                     if res is None:
911                         vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
912                                                            offset)
913                     else:
914                         cookie = get_cookie(res.controls, n)
915                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
916                                       (before, after, offset, n,
917                                        cookie))
918
919                     res = self.ldb.search(self.base_dn,
920                                           expression=expression,
921                                           scope=ldb.SCOPE_SUBTREE,
922                                           attrs=[attr],
923                                           controls=[sort_control,
924                                                     vlv_search,
925                                                     show_deleted_control])
926
927                     results = [x[attr][0] for x in res]
928
929                     self.assertCorrectResults(results, expected_order,
930                                               offset, before, after)
931
932     def test_server_vlv_gte_with_cookie(self):
933         attrs = [x for x in self.users[0].keys() if x not in
934                  ('dn', 'objectclass')]
935         for attr in attrs:
936             gte_order, expected_order, gte_map = \
937                                         self.get_gte_tests_and_order(attr)
938             # In case there is some order dependency, disorder tests
939             gte_tests = gte_order[:]
940             random.seed(1)
941             random.shuffle(gte_tests)
942             res = None
943             sort_control = "server_sort:1:0:%s" % attr
944             for before in [0, 1, 2, 4]:
945                 for after in [0, 1, 3, 6]:
946                     for gte in gte_tests:
947                         if res is not None:
948                             cookie = get_cookie(res.controls, len(self.users))
949                         else:
950                             cookie = None
951                         vlv_search = encode_vlv_control(before=before,
952                                                         after=after,
953                                                         gte=get_bytes(gte),
954                                                         cookie=cookie)
955
956                         res = self.ldb.search(self.ou,
957                                               scope=ldb.SCOPE_ONELEVEL,
958                                               attrs=[attr],
959                                               controls=[sort_control,
960                                                         vlv_search])
961
962                         results = [x[attr][0] for x in res]
963                         offset = gte_map.get(gte, len(expected_order))
964
965                         # here offset is 0-based
966                         start = max(offset - before, 0)
967                         end = offset + 1 + after
968
969                         expected_results = expected_order[start: end]
970
971                         self.assertEquals(expected_results, results)
972
973     def test_server_vlv_gte_no_cookie(self):
974         attrs = [x for x in self.users[0].keys() if x not in
975                  ('dn', 'objectclass')]
976         iteration = 0
977         for attr in attrs:
978             gte_order, expected_order, gte_map = \
979                                         self.get_gte_tests_and_order(attr)
980             # In case there is some order dependency, disorder tests
981             gte_tests = gte_order[:]
982             random.seed(1)
983             random.shuffle(gte_tests)
984
985             sort_control = "server_sort:1:0:%s" % attr
986             for before in [0, 1, 3]:
987                 for after in [0, 4]:
988                     for gte in gte_tests:
989                         vlv_search = encode_vlv_control(before=before,
990                                                         after=after,
991                                                         gte=get_bytes(gte))
992
993                         res = self.ldb.search(self.ou,
994                                               scope=ldb.SCOPE_ONELEVEL,
995                                               attrs=[attr],
996                                               controls=[sort_control,
997                                                         vlv_search])
998                         results = [x[attr][0] for x in res]
999
1000                         # here offset is 0-based
1001                         offset = gte_map.get(gte, len(expected_order))
1002                         start = max(offset - before, 0)
1003                         end = offset + after + 1
1004                         expected_results = expected_order[start: end]
1005                         iteration += 1
1006                         if expected_results != results:
1007                             middle = expected_order[len(expected_order) // 2]
1008                             print(expected_results, results)
1009                             print(middle)
1010                             print(expected_order)
1011                             print()
1012                             print("\nattr %s offset %d before %d "
1013                                   "after %d gte %s" %
1014                                   (attr, offset, before, after, gte))
1015                         self.assertEquals(expected_results, results)
1016
1017     def test_multiple_searches(self):
1018         """The maximum number of concurrent vlv searches per connection is
1019         currently set at 3. That means if you open 4 VLV searches the
1020         cookie on the first one should fail.
1021         """
1022         # Windows has a limit of 10 VLVs where there are low numbers
1023         # of objects in each search.
1024         attrs = ([x for x in self.users[0].keys() if x not in
1025                   ('dn', 'objectclass')] * 2)[:12]
1026
1027         vlv_cookies = []
1028         for attr in attrs:
1029             sort_control = "server_sort:1:0:%s" % attr
1030
1031             res = self.ldb.search(self.ou,
1032                                   scope=ldb.SCOPE_ONELEVEL,
1033                                   attrs=[attr],
1034                                   controls=[sort_control,
1035                                             "vlv:1:1:1:1:0"])
1036
1037             cookie = get_cookie(res.controls, len(self.users))
1038             vlv_cookies.append(cookie)
1039             time.sleep(0.2)
1040
1041         # now this one should fail
1042         self.assertRaises(ldb.LdbError,
1043                           self.ldb.search,
1044                           self.ou,
1045                           scope=ldb.SCOPE_ONELEVEL,
1046                           attrs=[attr],
1047                           controls=[sort_control,
1048                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1049
1050         # and this one should succeed
1051         res = self.ldb.search(self.ou,
1052                               scope=ldb.SCOPE_ONELEVEL,
1053                               attrs=[attr],
1054                               controls=[sort_control,
1055                                         "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1056
1057         # this one should fail because it is a new connection and
1058         # doesn't share cookies
1059         new_ldb = SamDB(host, credentials=creds,
1060                         session_info=system_session(lp), lp=lp)
1061
1062         self.assertRaises(ldb.LdbError,
1063                           new_ldb.search, self.ou,
1064                           scope=ldb.SCOPE_ONELEVEL,
1065                           attrs=[attr],
1066                           controls=[sort_control,
1067                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1068
1069         # but now without the critical flag it just does no VLV.
1070         new_ldb.search(self.ou,
1071                        scope=ldb.SCOPE_ONELEVEL,
1072                        attrs=[attr],
1073                        controls=[sort_control,
1074                                  "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1075
1076
1077 if "://" not in host:
1078     if os.path.isfile(host):
1079         host = "tdb://%s" % host
1080     else:
1081         host = "ldap://%s" % host
1082
1083
1084 TestProgram(module=__name__, opts=subunitopts)