2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
12 sys.path.insert(0, "bin/python")
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
16 import samba.getopt as options
18 from samba.auth import system_session
20 from samba.samdb import SamDB
24 parser = optparse.OptionParser("vlv.py [options] <host>")
25 sambaopts = options.SambaOptions(parser)
26 parser.add_option_group(sambaopts)
27 parser.add_option_group(options.VersionOptions(parser))
28 # use command line creds if available
29 credopts = options.CredentialsOptions(parser)
30 parser.add_option_group(credopts)
31 subunitopts = SubunitOptions(parser)
32 parser.add_option_group(subunitopts)
34 parser.add_option('--elements', type='int', default=20,
35 help="use this many elements in the tests")
37 parser.add_option('--delete-in-setup', action='store_true',
38 help="cleanup in next setup rather than teardown")
40 parser.add_option('--skip-attr-regex',
41 help="ignore attributes matching this regex")
43 opts, args = parser.parse_args()
51 lp = sambaopts.get_loadparm()
52 creds = credopts.get_credentials(lp)
54 N_ELEMENTS = opts.elements
57 class VlvTestException(Exception):
61 def encode_vlv_control(critical=1,
67 s = "vlv:%d:%d:%d:" % (critical, before, after)
69 if offset is not None:
70 m = "%d:%d" % (offset, n)
71 elif ':' in gte or '\x00' in gte:
72 gte = base64.b64encode(gte).decode('utf8')
73 m = "base64>=%s" % gte
80 return s + m + ':' + cookie
83 def get_cookie(controls, expected_n=None):
84 """Get the cookie, STILL base64 encoded, or raise ValueError."""
85 for c in list(controls):
87 if cstr.startswith('vlv_resp'):
88 head, n, _, cookie = cstr.rsplit(':', 3)
89 if expected_n is not None and int(n) != expected_n:
90 raise ValueError("Expected %s items, server said %s" %
93 raise ValueError("there is no VLV response")
96 class VLVTests(samba.tests.TestCase):
98 def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
99 name = "%s%d%s" % (prefix, i, suffix)
102 "objectclass": "user",
103 'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
104 "roomNumber": "%sbc" % (n - i),
106 "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
107 "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
108 "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
109 "flags": str(i * (n - i)),
110 "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
115 # _user_broken_attrs tests are broken due to problems outside
117 _user_broken_attrs = {
118 # Sort doesn't look past a NUL byte.
119 "photo": "\x00%d" % (n - i),
120 "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
122 "displayNamePrintable": "%d\x00%c" % (i, i & 255),
123 "adminDisplayName": "%d\x00b" % (n - i),
124 "title": "%d%sb" % (n - i, '\x00' * i),
125 "comment": "Favourite colour is %d" % (n % (i + 1)),
127 # Names that vary only in case. Windows returns
128 # equivalent addresses in the order they were put
129 # in ('a st', 'A st',...).
130 "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
133 if attrs is not None:
136 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
138 if opts.skip_attr_regex:
139 match = re.compile(opts.skip_attr_regex).search
140 for k in user.keys():
144 self.users.append(user)
149 super(VLVTests, self).setUp()
150 self.ldb = SamDB(host, credentials=creds,
151 session_info=system_session(lp), lp=lp)
153 self.base_dn = self.ldb.domain_dn()
154 self.ou = "ou=vlv,%s" % self.base_dn
155 if opts.delete_in_setup:
157 self.ldb.delete(self.ou, ['tree_delete:1'])
158 except ldb.LdbError as e:
159 print("tried deleting %s, got error %s" % (self.ou, e))
162 "objectclass": "organizationalUnit"})
165 for i in range(N_ELEMENTS):
166 self.create_user(i, N_ELEMENTS)
168 attrs = self.users[0].keys()
169 self.binary_sorted_keys = ['audio',
173 "displayNamePrintable"]
175 self.numeric_sorted_keys = ['flags',
178 self.timestamp_keys = ['msTSExpireDate4']
180 self.int64_keys = set(['accountExpires'])
182 self.locale_sorted_keys = [x for x in attrs if
183 x not in (self.binary_sorted_keys +
184 self.numeric_sorted_keys)]
186 # don't try spaces, etc in cn
187 self.delicate_keys = ['cn']
190 super(VLVTests, self).tearDown()
191 if not opts.delete_in_setup:
192 self.ldb.delete(self.ou, ['tree_delete:1'])
194 def get_full_list(self, attr, include_cn=False):
195 """Fetch the whole list sorted on the attribute, using the VLV.
196 This way you get a VLV cookie."""
197 n_users = len(self.users)
198 sort_control = "server_sort:1:0:%s" % attr
199 half_n = n_users // 2
200 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
204 res = self.ldb.search(self.ou,
205 scope=ldb.SCOPE_ONELEVEL,
207 controls=[sort_control,
210 full_results = [(x[attr][0], x['cn'][0]) for x in res]
212 full_results = [x[attr][0].lower() for x in res]
213 controls = res.controls
214 return full_results, controls, sort_control
216 def get_expected_order(self, attr, expression=None):
217 """Fetch the whole list sorted on the attribute, using sort only."""
218 sort_control = "server_sort:1:0:%s" % attr
219 res = self.ldb.search(self.ou,
220 scope=ldb.SCOPE_ONELEVEL,
221 expression=expression,
223 controls=[sort_control])
224 results = [x[attr][0] for x in res]
227 def delete_user(self, user):
228 self.ldb.delete(user['dn'])
229 del self.users[self.users.index(user)]
231 def get_gte_tests_and_order(self, attr, expression=None):
232 expected_order = self.get_expected_order(attr, expression=expression)
234 if attr in self.delicate_keys:
242 elif attr in self.timestamp_keys:
253 elif attr not in self.numeric_sorted_keys:
267 gte_keys.append(expected_order[len(expected_order) // 2] + ' tail')
270 # "numeric" means positive integers
271 # doesn't work with -1, 3.14, ' 3', '9' * 20
278 if attr in self.int64_keys:
279 gte_keys += ['3' * 12, '71' * 8]
281 for i, x in enumerate(gte_keys):
282 user = self.create_user(i, N_ELEMENTS,
285 gte_users.append(user)
287 gte_order = self.get_expected_order(attr)
288 for user in gte_users:
289 self.delete_user(user)
292 expected_order_2 = self.get_expected_order(attr, expression=expression)
293 self.assertEqual(expected_order, expected_order_2)
295 # Map gte tests to indexes in expected order. This will break
296 # if gte_order and expected_order are differently ordered (as
300 # index to the first one with each value
302 for i, k in enumerate(expected_order):
303 if k not in index_map:
318 gte_map[k] = len(expected_order)
323 print(" %10s => %10s" % (k, gte_map[k]))
325 return gte_order, expected_order, gte_map
327 def assertCorrectResults(self, results, expected_order,
328 offset, before, after):
329 """A helper to calculate offsets correctly and say as much as possible
330 when something goes wrong."""
332 start = max(offset - before - 1, 0)
334 expected_results = expected_order[start: end]
336 # if it is a tuple with the cn, drop the cn
337 if expected_results and isinstance(expected_results[0], tuple):
338 expected_results = [x[0] for x in expected_results]
340 if expected_results == results:
343 if expected_order is not None:
344 print("expected order: %s" % expected_order[:20])
345 if len(expected_order) > 20:
346 print("... and %d more not shown" % (len(expected_order) - 20))
348 print("offset %d before %d after %d" % (offset, before, after))
349 print("start %d end %d" % (start, end))
350 print("expected: %s" % expected_results)
351 print("got : %s" % results)
352 self.assertEquals(expected_results, results)
354 def test_server_vlv_with_cookie(self):
355 attrs = [x for x in self.users[0].keys() if x not in
356 ('dn', 'objectclass')]
358 expected_order = self.get_expected_order(attr)
359 sort_control = "server_sort:1:0:%s" % attr
362 for before in [10, 0, 3, 1, 4, 5, 2]:
363 for after in [0, 3, 1, 4, 5, 2, 7]:
364 for offset in range(max(1, before - 2),
365 min(n - after + 2, n)):
367 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
370 cookie = get_cookie(res.controls, n)
371 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
372 (before, after, offset, n,
375 res = self.ldb.search(self.ou,
376 scope=ldb.SCOPE_ONELEVEL,
378 controls=[sort_control,
381 results = [x[attr][0] for x in res]
383 self.assertCorrectResults(results, expected_order,
384 offset, before, after)
386 def run_index_tests_with_expressions(self, expressions):
387 # Here we don't test every before/after combination.
388 attrs = [x for x in self.users[0].keys() if x not in
389 ('dn', 'objectclass')]
391 for expression in expressions:
392 expected_order = self.get_expected_order(attr, expression)
393 sort_control = "server_sort:1:0:%s" % attr
395 n = len(expected_order)
396 for before in range(0, 11):
398 for offset in range(max(1, before - 2),
399 min(n - after + 2, n)):
401 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
404 cookie = get_cookie(res.controls)
405 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
406 (before, after, offset, n,
409 res = self.ldb.search(self.ou,
410 expression=expression,
411 scope=ldb.SCOPE_ONELEVEL,
413 controls=[sort_control,
416 results = [x[attr][0] for x in res]
418 self.assertCorrectResults(results, expected_order,
419 offset, before, after)
421 def test_server_vlv_with_expression(self):
422 """What happens when we run the VLV with an expression?"""
423 expressions = ["(objectClass=*)",
424 "(cn=%s)" % self.users[-1]['cn'],
425 "(roomNumber=%s)" % self.users[0]['roomNumber'],
427 self.run_index_tests_with_expressions(expressions)
429 def test_server_vlv_with_failing_expression(self):
430 """What happens when we run the VLV on an expression that matches
432 expressions = ["(samaccountname=testferf)",
435 self.run_index_tests_with_expressions(expressions)
437 def run_gte_tests_with_expressions(self, expressions):
438 # Here we don't test every before/after combination.
439 attrs = [x for x in self.users[0].keys() if x not in
440 ('dn', 'objectclass')]
441 for expression in expressions:
443 gte_order, expected_order, gte_map = \
444 self.get_gte_tests_and_order(attr, expression)
445 # In case there is some order dependency, disorder tests
446 gte_tests = gte_order[:]
448 random.shuffle(gte_tests)
450 sort_control = "server_sort:1:0:%s" % attr
452 expected_order = self.get_expected_order(attr, expression)
453 sort_control = "server_sort:1:0:%s" % attr
455 for before in range(0, 11):
457 for gte in gte_tests:
459 cookie = get_cookie(res.controls)
462 vlv_search = encode_vlv_control(before=before,
467 res = self.ldb.search(self.ou,
468 scope=ldb.SCOPE_ONELEVEL,
469 expression=expression,
471 controls=[sort_control,
474 results = [x[attr][0] for x in res]
475 offset = gte_map.get(gte, len(expected_order))
477 # here offset is 0-based
478 start = max(offset - before, 0)
479 end = offset + 1 + after
481 expected_results = expected_order[start: end]
483 self.assertEquals(expected_results, results)
485 def test_vlv_gte_with_expression(self):
486 """What happens when we run the VLV with an expression?"""
487 expressions = ["(objectClass=*)",
488 "(cn=%s)" % self.users[-1]['cn'],
489 "(roomNumber=%s)" % self.users[0]['roomNumber'],
491 self.run_gte_tests_with_expressions(expressions)
493 def test_vlv_gte_with_failing_expression(self):
494 """What happens when we run the VLV on an expression that matches
496 expressions = ["(samaccountname=testferf)",
499 self.run_gte_tests_with_expressions(expressions)
501 def test_server_vlv_with_cookie_while_adding_and_deleting(self):
502 """What happens if we add or remove items in the middle of the VLV?
504 Nothing. The search and the sort is not repeated, and we only
505 deal with the objects originally found.
507 attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
508 ('dn', 'objectclass')]
512 full_results, controls, sort_control = \
513 self.get_full_list(attr, True)
514 original_n = len(self.users)
516 expected_order = full_results
519 for before in range(0, 3) + [6, 11, 19]:
520 for after in range(0, 3) + [6, 11, 19]:
521 start = max(before - 1, 1)
522 end = max(start + 4, original_n - after + 2)
523 for offset in range(start, end):
524 # if iteration > 2076:
526 cookie = get_cookie(controls, original_n)
527 vlv_search = encode_vlv_control(before=before,
534 res = self.ldb.search(self.ou,
535 scope=ldb.SCOPE_ONELEVEL,
537 controls=[sort_control,
540 controls = res.controls
541 results = [x[attr][0] for x in res]
542 real_offset = max(1, min(offset, len(expected_order)))
544 expected_results = []
546 begin_offset = max(real_offset - before - 1, 0)
547 real_before = min(before, real_offset - 1)
548 real_after = min(after,
549 len(expected_order) - real_offset)
551 for x in expected_order[begin_offset:]:
553 expected_results.append(x[0])
554 if (len(expected_results) ==
555 real_before + real_after + 1):
560 if expected_results != results:
561 print("attr %s before %d after %d offset %d" %
562 (attr, before, after, offset))
563 self.assertEquals(expected_results, results)
566 if random.random() < 0.1 + (n < 5) * 0.05:
570 i = random.randrange(n)
571 user = self.create_user(i, n, suffix='-%s' %
574 if random.random() < 0.1 + (n > 50) * 0.02 and n:
575 index = random.randrange(n)
576 user = self.users.pop(index)
578 self.ldb.delete(user['dn'])
580 replaced = (user[attr], user['cn'])
581 if replaced in expected_order:
582 i = expected_order.index(replaced)
583 expected_order[i] = None
585 def test_server_vlv_with_cookie_while_changing(self):
586 """What happens if we modify items in the middle of the VLV?
588 The expected behaviour (as found on Windows) is the sort is
589 not repeated, but the changes in attributes are reflected.
591 attrs = [x for x in self.users[0].keys() if x not in
592 ('dn', 'objectclass', 'cn')]
594 n_users = len(self.users)
595 expected_order = [x.upper() for x in self.get_expected_order(attr)]
596 sort_control = "server_sort:1:0:%s" % attr
600 # First we'll fetch the whole list so we know the original
601 # sort order. This is necessary because we don't know how
602 # the server will order equivalent items. We are using the
604 half_n = n_users // 2
605 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
606 res = self.ldb.search(self.ou,
607 scope=ldb.SCOPE_ONELEVEL,
609 controls=[sort_control, vlv_search])
611 results = [x[attr][0].upper() for x in res]
612 #self.assertEquals(expected_order, results)
614 dn_order = [str(x['dn']) for x in res]
617 for before in range(0, 3):
618 for after in range(0, 3):
619 for offset in range(1 + before, n_users - after):
620 cookie = get_cookie(res.controls, len(self.users))
621 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
622 (before, after, offset, len(self.users),
625 res = self.ldb.search(self.ou,
626 scope=ldb.SCOPE_ONELEVEL,
628 controls=[sort_control,
631 dn_results = [str(x['dn']) for x in res]
632 dn_expected = dn_order[offset - before - 1:
635 self.assertEquals(dn_expected, dn_results)
637 results = [x[attr][0].upper() for x in res]
639 self.assertCorrectResults(results, values,
640 offset, before, after)
644 if (attr in self.locale_sorted_keys or
645 attr in self.binary_sorted_keys):
647 i2 = (i ^ 255) % n_users
652 if v2 in self.locale_sorted_keys:
654 cn1 = dn1.split(',', 1)[0][3:]
655 cn2 = dn2.split(',', 1)[0][3:]
660 m.dn = ldb.Dn(self.ldb, dn1)
661 m[attr] = ldb.MessageElement(v2,
662 ldb.FLAG_MOD_REPLACE,
667 def test_server_vlv_fractions_with_cookie(self):
668 """What happens when the count is set to an arbitrary number?
670 In that case the offset and the count form a fraction, and the
671 VLV should be centred at a point offset/count of the way
672 through. For example, if offset is 3 and count is 6, the VLV
673 should be looking around halfway. The actual algorithm is a
674 bit fiddlier than that, because of the one-basedness of VLV.
676 attrs = [x for x in self.users[0].keys() if x not in
677 ('dn', 'objectclass')]
679 n_users = len(self.users)
684 full_results, controls, sort_control = self.get_full_list(attr)
685 self.assertEqual(len(full_results), n_users)
686 for before in range(0, 2):
687 for after in range(0, 2):
688 for denominator in range(1, 20):
689 for offset in range(1, denominator + 3):
690 cookie = get_cookie(controls, len(self.users))
691 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
692 (before, after, offset,
696 res = self.ldb.search(self.ou,
697 scope=ldb.SCOPE_ONELEVEL,
699 controls=[sort_control,
701 except ldb.LdbError as e:
704 print("offset %d denominator %d raised error "
705 "expected error %s\n"
706 "(offset zero is illegal unless "
707 "content count is zero)" %
708 (offset, denominator, e))
711 results = [x[attr][0].lower() for x in res]
714 denominator = n_users
717 elif denominator == 1:
718 # the offset can only be 1, but the 1/1 case
719 # means something special
721 real_offset = n_users
725 if offset > denominator:
728 int(round((n_users - 1) *
730 (denominator - 1.0)))
733 self.assertCorrectResults(results, full_results,
737 controls = res.controls
739 for c in list(controls):
741 if cstr.startswith('vlv_resp'):
742 bits = cstr.rsplit(':')
743 print("the answer is %s; we said %d" %
744 (bits[2], real_offset))
747 def test_server_vlv_no_cookie(self):
748 attrs = [x for x in self.users[0].keys() if x not in
749 ('dn', 'objectclass')]
752 expected_order = self.get_expected_order(attr)
753 sort_control = "server_sort:1:0:%s" % attr
754 for before in range(0, 5):
755 for after in range(0, 7):
756 for offset in range(1 + before, len(self.users) - after):
757 res = self.ldb.search(self.ou,
758 scope=ldb.SCOPE_ONELEVEL,
760 controls=[sort_control,
764 results = [x[attr][0] for x in res]
765 self.assertCorrectResults(results, expected_order,
766 offset, before, after)
768 def get_expected_order_showing_deleted(self, attr,
769 expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
771 scope=ldb.SCOPE_SUBTREE
773 """Fetch the whole list sorted on the attribute, using sort only,
774 searching in the entire tree, not just our OU. This is the
775 way to find deleted objects.
779 sort_control = "server_sort:1:0:%s" % attr
780 controls = [sort_control, "show_deleted:1"]
782 res = self.ldb.search(base,
784 expression=expression,
787 results = [x[attr][0] for x in res]
790 def add_deleted_users(self, n):
791 deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
794 for user in deleted_users:
795 self.delete_user(user)
797 def test_server_vlv_no_cookie_show_deleted(self):
798 """What do we see with the show_deleted control?"""
799 attrs = ['objectGUID',
808 # add some deleted users first, just in case there are none
809 self.add_deleted_users(6)
811 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
814 show_deleted_control = "show_deleted:1"
815 expected_order = self.get_expected_order_showing_deleted(attr,
817 n = len(expected_order)
818 sort_control = "server_sort:1:0:%s" % attr
819 for before in [3, 1, 0]:
821 # don't test every position, because there could be hundreds.
822 # jump back and forth instead
824 offset = random.randrange(max(1, before - 2),
825 min(n - after + 2, n))
826 res = self.ldb.search(self.base_dn,
827 expression=expression,
828 scope=ldb.SCOPE_SUBTREE,
830 controls=[sort_control,
831 show_deleted_control,
837 results = [x[attr][0] for x in res]
838 self.assertCorrectResults(results, expected_order,
839 offset, before, after)
841 def test_server_vlv_no_cookie_show_deleted_only(self):
842 """What do we see with the show_deleted control when we're not looking
843 at any non-deleted things"""
844 attrs = ['objectGUID',
851 # add some deleted users first, just in case there are none
852 self.add_deleted_users(4)
853 base = 'CN=Deleted Objects,%s' % self.base_dn
854 expression = "(cn=vlv-deleted*)"
856 show_deleted_control = "show_deleted:1"
857 expected_order = self.get_expected_order_showing_deleted(attr,
858 expression=expression,
860 scope=ldb.SCOPE_ONELEVEL)
861 print("searching for attr %s amongst %d deleted objects" %
862 (attr, len(expected_order)))
863 sort_control = "server_sort:1:0:%s" % attr
864 step = max(len(expected_order) // 10, 1)
865 for before in [3, 0]:
867 for offset in range(1 + before,
868 len(expected_order) - after,
870 res = self.ldb.search(base,
871 expression=expression,
872 scope=ldb.SCOPE_ONELEVEL,
874 controls=[sort_control,
875 show_deleted_control,
879 results = [x[attr][0] for x in res]
880 self.assertCorrectResults(results, expected_order,
881 offset, before, after)
883 def test_server_vlv_with_cookie_show_deleted(self):
884 """What do we see with the show_deleted control?"""
885 attrs = ['objectGUID',
893 self.add_deleted_users(6)
896 expected_order = self.get_expected_order(attr)
897 sort_control = "server_sort:1:0:%s" % attr
899 show_deleted_control = "show_deleted:1"
900 expected_order = self.get_expected_order_showing_deleted(attr)
901 n = len(expected_order)
902 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
903 for before in [3, 2, 1, 0]:
906 offset = random.randrange(max(1, before - 2),
907 min(n - after + 2, n))
909 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
912 cookie = get_cookie(res.controls, n)
913 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
914 (before, after, offset, n,
917 res = self.ldb.search(self.base_dn,
918 expression=expression,
919 scope=ldb.SCOPE_SUBTREE,
921 controls=[sort_control,
923 show_deleted_control])
925 results = [x[attr][0] for x in res]
927 self.assertCorrectResults(results, expected_order,
928 offset, before, after)
930 def test_server_vlv_gte_with_cookie(self):
931 attrs = [x for x in self.users[0].keys() if x not in
932 ('dn', 'objectclass')]
934 gte_order, expected_order, gte_map = \
935 self.get_gte_tests_and_order(attr)
936 # In case there is some order dependency, disorder tests
937 gte_tests = gte_order[:]
939 random.shuffle(gte_tests)
941 sort_control = "server_sort:1:0:%s" % attr
942 for before in [0, 1, 2, 4]:
943 for after in [0, 1, 3, 6]:
944 for gte in gte_tests:
946 cookie = get_cookie(res.controls, len(self.users))
949 vlv_search = encode_vlv_control(before=before,
954 res = self.ldb.search(self.ou,
955 scope=ldb.SCOPE_ONELEVEL,
957 controls=[sort_control,
960 results = [x[attr][0] for x in res]
961 offset = gte_map.get(gte, len(expected_order))
963 # here offset is 0-based
964 start = max(offset - before, 0)
965 end = offset + 1 + after
967 expected_results = expected_order[start: end]
969 self.assertEquals(expected_results, results)
971 def test_server_vlv_gte_no_cookie(self):
972 attrs = [x for x in self.users[0].keys() if x not in
973 ('dn', 'objectclass')]
976 gte_order, expected_order, gte_map = \
977 self.get_gte_tests_and_order(attr)
978 # In case there is some order dependency, disorder tests
979 gte_tests = gte_order[:]
981 random.shuffle(gte_tests)
983 sort_control = "server_sort:1:0:%s" % attr
984 for before in [0, 1, 3]:
986 for gte in gte_tests:
987 vlv_search = encode_vlv_control(before=before,
991 res = self.ldb.search(self.ou,
992 scope=ldb.SCOPE_ONELEVEL,
994 controls=[sort_control,
996 results = [x[attr][0] for x in res]
998 # here offset is 0-based
999 offset = gte_map.get(gte, len(expected_order))
1000 start = max(offset - before, 0)
1001 end = offset + after + 1
1002 expected_results = expected_order[start: end]
1004 if expected_results != results:
1005 middle = expected_order[len(expected_order) // 2]
1006 print(expected_results, results)
1008 print(expected_order)
1010 print("\nattr %s offset %d before %d "
1012 (attr, offset, before, after, gte))
1013 self.assertEquals(expected_results, results)
1015 def test_multiple_searches(self):
1016 """The maximum number of concurrent vlv searches per connection is
1017 currently set at 3. That means if you open 4 VLV searches the
1018 cookie on the first one should fail.
1020 # Windows has a limit of 10 VLVs where there are low numbers
1021 # of objects in each search.
1022 attrs = ([x for x in self.users[0].keys() if x not in
1023 ('dn', 'objectclass')] * 2)[:12]
1027 sort_control = "server_sort:1:0:%s" % attr
1029 res = self.ldb.search(self.ou,
1030 scope=ldb.SCOPE_ONELEVEL,
1032 controls=[sort_control,
1035 cookie = get_cookie(res.controls, len(self.users))
1036 vlv_cookies.append(cookie)
1039 # now this one should fail
1040 self.assertRaises(ldb.LdbError,
1043 scope=ldb.SCOPE_ONELEVEL,
1045 controls=[sort_control,
1046 "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1048 # and this one should succeed
1049 res = self.ldb.search(self.ou,
1050 scope=ldb.SCOPE_ONELEVEL,
1052 controls=[sort_control,
1053 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1055 # this one should fail because it is a new connection and
1056 # doesn't share cookies
1057 new_ldb = SamDB(host, credentials=creds,
1058 session_info=system_session(lp), lp=lp)
1060 self.assertRaises(ldb.LdbError,
1061 new_ldb.search, self.ou,
1062 scope=ldb.SCOPE_ONELEVEL,
1064 controls=[sort_control,
1065 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1067 # but now without the critical flag it just does no VLV.
1068 new_ldb.search(self.ou,
1069 scope=ldb.SCOPE_ONELEVEL,
1071 controls=[sort_control,
1072 "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1075 if "://" not in host:
1076 if os.path.isfile(host):
1077 host = "tdb://%s" % host
1079 host = "ldap://%s" % host
1082 TestProgram(module=__name__, opts=subunitopts)