2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
12 sys.path.insert(0, "bin/python")
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
16 import samba.getopt as options
18 from samba.auth import system_session
20 from samba.samdb import SamDB
21 from samba.compat import get_bytes
22 from samba.compat import get_string
26 parser = optparse.OptionParser("vlv.py [options] <host>")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
29 parser.add_option_group(options.VersionOptions(parser))
30 # use command line creds if available
31 credopts = options.CredentialsOptions(parser)
32 parser.add_option_group(credopts)
33 subunitopts = SubunitOptions(parser)
34 parser.add_option_group(subunitopts)
36 parser.add_option('--elements', type='int', default=20,
37 help="use this many elements in the tests")
39 parser.add_option('--delete-in-setup', action='store_true',
40 help="cleanup in next setup rather than teardown")
42 parser.add_option('--skip-attr-regex',
43 help="ignore attributes matching this regex")
45 opts, args = parser.parse_args()
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
56 N_ELEMENTS = opts.elements
59 class VlvTestException(Exception):
63 def encode_vlv_control(critical=1,
69 s = "vlv:%d:%d:%d:" % (critical, before, after)
71 if offset is not None:
72 m = "%d:%d" % (offset, n)
73 elif b':' in gte or b'\x00' in gte:
74 gte = get_string(base64.b64encode(gte))
75 m = "base64>=%s" % gte
77 m = ">=%s" % get_string(gte)
82 return s + m + ':' + cookie
85 def get_cookie(controls, expected_n=None):
86 """Get the cookie, STILL base64 encoded, or raise ValueError."""
87 for c in list(controls):
89 if cstr.startswith('vlv_resp'):
90 head, n, _, cookie = cstr.rsplit(':', 3)
91 if expected_n is not None and int(n) != expected_n:
92 raise ValueError("Expected %s items, server said %s" %
95 raise ValueError("there is no VLV response")
98 class VLVTests(samba.tests.TestCase):
100 def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
101 name = "%s%d%s" % (prefix, i, suffix)
104 "objectclass": "user",
105 'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
106 "roomNumber": "%sbc" % (n - i),
108 "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
109 "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
110 "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
111 "flags": str(i * (n - i)),
112 "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
117 # _user_broken_attrs tests are broken due to problems outside
119 _user_broken_attrs = {
120 # Sort doesn't look past a NUL byte.
121 "photo": "\x00%d" % (n - i),
122 "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
124 "displayNamePrintable": "%d\x00%c" % (i, i & 255),
125 "adminDisplayName": "%d\x00b" % (n - i),
126 "title": "%d%sb" % (n - i, '\x00' * i),
127 "comment": "Favourite colour is %d" % (n % (i + 1)),
129 # Names that vary only in case. Windows returns
130 # equivalent addresses in the order they were put
131 # in ('a st', 'A st',...).
132 "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
135 if attrs is not None:
138 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
140 if opts.skip_attr_regex:
141 match = re.compile(opts.skip_attr_regex).search
142 for k in user.keys():
146 self.users.append(user)
151 super(VLVTests, self).setUp()
152 self.ldb = SamDB(host, credentials=creds,
153 session_info=system_session(lp), lp=lp)
155 self.base_dn = self.ldb.domain_dn()
156 self.ou = "ou=vlv,%s" % self.base_dn
157 if opts.delete_in_setup:
159 self.ldb.delete(self.ou, ['tree_delete:1'])
160 except ldb.LdbError as e:
161 print("tried deleting %s, got error %s" % (self.ou, e))
164 "objectclass": "organizationalUnit"})
167 for i in range(N_ELEMENTS):
168 self.create_user(i, N_ELEMENTS)
170 attrs = self.users[0].keys()
171 self.binary_sorted_keys = ['audio',
175 "displayNamePrintable"]
177 self.numeric_sorted_keys = ['flags',
180 self.timestamp_keys = ['msTSExpireDate4']
182 self.int64_keys = set(['accountExpires'])
184 self.locale_sorted_keys = [x for x in attrs if
185 x not in (self.binary_sorted_keys +
186 self.numeric_sorted_keys)]
188 # don't try spaces, etc in cn
189 self.delicate_keys = ['cn']
192 super(VLVTests, self).tearDown()
193 if not opts.delete_in_setup:
194 self.ldb.delete(self.ou, ['tree_delete:1'])
196 def get_full_list(self, attr, include_cn=False):
197 """Fetch the whole list sorted on the attribute, using the VLV.
198 This way you get a VLV cookie."""
199 n_users = len(self.users)
200 sort_control = "server_sort:1:0:%s" % attr
201 half_n = n_users // 2
202 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
206 res = self.ldb.search(self.ou,
207 scope=ldb.SCOPE_ONELEVEL,
209 controls=[sort_control,
212 full_results = [(str(x[attr][0]), str(x['cn'][0])) for x in res]
214 full_results = [str(x[attr][0]).lower() for x in res]
215 controls = res.controls
216 return full_results, controls, sort_control
218 def get_expected_order(self, attr, expression=None):
219 """Fetch the whole list sorted on the attribute, using sort only."""
220 sort_control = "server_sort:1:0:%s" % attr
221 res = self.ldb.search(self.ou,
222 scope=ldb.SCOPE_ONELEVEL,
223 expression=expression,
225 controls=[sort_control])
226 results = [x[attr][0] for x in res]
229 def delete_user(self, user):
230 self.ldb.delete(user['dn'])
231 del self.users[self.users.index(user)]
233 def get_gte_tests_and_order(self, attr, expression=None):
234 expected_order = self.get_expected_order(attr, expression=expression)
236 if attr in self.delicate_keys:
244 elif attr in self.timestamp_keys:
255 elif attr not in self.numeric_sorted_keys:
269 gte_keys.append(expected_order[len(expected_order) // 2] + b' tail')
272 # "numeric" means positive integers
273 # doesn't work with -1, 3.14, ' 3', '9' * 20
280 if attr in self.int64_keys:
281 gte_keys += ['3' * 12, '71' * 8]
283 for i, x in enumerate(gte_keys):
284 user = self.create_user(i, N_ELEMENTS,
287 gte_users.append(user)
289 gte_order = self.get_expected_order(attr)
290 for user in gte_users:
291 self.delete_user(user)
294 expected_order_2 = self.get_expected_order(attr, expression=expression)
295 self.assertEqual(expected_order, expected_order_2)
297 # Map gte tests to indexes in expected order. This will break
298 # if gte_order and expected_order are differently ordered (as
302 # index to the first one with each value
304 for i, k in enumerate(expected_order):
305 if k not in index_map:
320 gte_map[k] = len(expected_order)
325 print(" %10s => %10s" % (k, gte_map[k]))
327 return gte_order, expected_order, gte_map
329 def assertCorrectResults(self, results, expected_order,
330 offset, before, after):
331 """A helper to calculate offsets correctly and say as much as possible
332 when something goes wrong."""
334 start = max(offset - before - 1, 0)
336 expected_results = expected_order[start: end]
338 # if it is a tuple with the cn, drop the cn
339 if expected_results and isinstance(expected_results[0], tuple):
340 expected_results = [x[0] for x in expected_results]
342 if expected_results == results:
345 if expected_order is not None:
346 print("expected order: %s" % expected_order[:20])
347 if len(expected_order) > 20:
348 print("... and %d more not shown" % (len(expected_order) - 20))
350 print("offset %d before %d after %d" % (offset, before, after))
351 print("start %d end %d" % (start, end))
352 print("expected: %s" % expected_results)
353 print("got : %s" % results)
354 self.assertEquals(expected_results, results)
356 def test_server_vlv_with_cookie(self):
357 attrs = [x for x in self.users[0].keys() if x not in
358 ('dn', 'objectclass')]
360 expected_order = self.get_expected_order(attr)
361 sort_control = "server_sort:1:0:%s" % attr
364 for before in [10, 0, 3, 1, 4, 5, 2]:
365 for after in [0, 3, 1, 4, 5, 2, 7]:
366 for offset in range(max(1, before - 2),
367 min(n - after + 2, n)):
369 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
372 cookie = get_cookie(res.controls, n)
373 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
374 (before, after, offset, n,
377 res = self.ldb.search(self.ou,
378 scope=ldb.SCOPE_ONELEVEL,
380 controls=[sort_control,
383 results = [x[attr][0] for x in res]
385 self.assertCorrectResults(results, expected_order,
386 offset, before, after)
388 def run_index_tests_with_expressions(self, expressions):
389 # Here we don't test every before/after combination.
390 attrs = [x for x in self.users[0].keys() if x not in
391 ('dn', 'objectclass')]
393 for expression in expressions:
394 expected_order = self.get_expected_order(attr, expression)
395 sort_control = "server_sort:1:0:%s" % attr
397 n = len(expected_order)
398 for before in range(0, 11):
400 for offset in range(max(1, before - 2),
401 min(n - after + 2, n)):
403 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
406 cookie = get_cookie(res.controls)
407 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
408 (before, after, offset, n,
411 res = self.ldb.search(self.ou,
412 expression=expression,
413 scope=ldb.SCOPE_ONELEVEL,
415 controls=[sort_control,
418 results = [x[attr][0] for x in res]
420 self.assertCorrectResults(results, expected_order,
421 offset, before, after)
423 def test_server_vlv_with_expression(self):
424 """What happens when we run the VLV with an expression?"""
425 expressions = ["(objectClass=*)",
426 "(cn=%s)" % self.users[-1]['cn'],
427 "(roomNumber=%s)" % self.users[0]['roomNumber'],
429 self.run_index_tests_with_expressions(expressions)
431 def test_server_vlv_with_failing_expression(self):
432 """What happens when we run the VLV on an expression that matches
434 expressions = ["(samaccountname=testferf)",
437 self.run_index_tests_with_expressions(expressions)
439 def run_gte_tests_with_expressions(self, expressions):
440 # Here we don't test every before/after combination.
441 attrs = [x for x in self.users[0].keys() if x not in
442 ('dn', 'objectclass')]
443 for expression in expressions:
445 gte_order, expected_order, gte_map = \
446 self.get_gte_tests_and_order(attr, expression)
447 # In case there is some order dependency, disorder tests
448 gte_tests = gte_order[:]
450 random.shuffle(gte_tests)
452 sort_control = "server_sort:1:0:%s" % attr
454 expected_order = self.get_expected_order(attr, expression)
455 sort_control = "server_sort:1:0:%s" % attr
457 for before in range(0, 11):
459 for gte in gte_tests:
461 cookie = get_cookie(res.controls)
464 vlv_search = encode_vlv_control(before=before,
469 res = self.ldb.search(self.ou,
470 scope=ldb.SCOPE_ONELEVEL,
471 expression=expression,
473 controls=[sort_control,
476 results = [x[attr][0] for x in res]
477 offset = gte_map.get(gte, len(expected_order))
479 # here offset is 0-based
480 start = max(offset - before, 0)
481 end = offset + 1 + after
483 expected_results = expected_order[start: end]
485 self.assertEquals(expected_results, results)
487 def test_vlv_gte_with_expression(self):
488 """What happens when we run the VLV with an expression?"""
489 expressions = ["(objectClass=*)",
490 "(cn=%s)" % self.users[-1]['cn'],
491 "(roomNumber=%s)" % self.users[0]['roomNumber'],
493 self.run_gte_tests_with_expressions(expressions)
495 def test_vlv_gte_with_failing_expression(self):
496 """What happens when we run the VLV on an expression that matches
498 expressions = ["(samaccountname=testferf)",
501 self.run_gte_tests_with_expressions(expressions)
503 def test_server_vlv_with_cookie_while_adding_and_deleting(self):
504 """What happens if we add or remove items in the middle of the VLV?
506 Nothing. The search and the sort is not repeated, and we only
507 deal with the objects originally found.
509 attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
510 ('dn', 'objectclass')]
514 full_results, controls, sort_control = \
515 self.get_full_list(attr, True)
516 original_n = len(self.users)
518 expected_order = full_results
521 for before in list(range(0, 3)) + [6, 11, 19]:
522 for after in list(range(0, 3)) + [6, 11, 19]:
523 start = max(before - 1, 1)
524 end = max(start + 4, original_n - after + 2)
525 for offset in range(start, end):
526 # if iteration > 2076:
528 cookie = get_cookie(controls, original_n)
529 vlv_search = encode_vlv_control(before=before,
536 res = self.ldb.search(self.ou,
537 scope=ldb.SCOPE_ONELEVEL,
539 controls=[sort_control,
542 controls = res.controls
543 results = [x[attr][0] for x in res]
544 real_offset = max(1, min(offset, len(expected_order)))
546 expected_results = []
548 begin_offset = max(real_offset - before - 1, 0)
549 real_before = min(before, real_offset - 1)
550 real_after = min(after,
551 len(expected_order) - real_offset)
553 for x in expected_order[begin_offset:]:
555 expected_results.append(get_bytes(x[0]))
556 if (len(expected_results) ==
557 real_before + real_after + 1):
562 if expected_results != results:
563 print("attr %s before %d after %d offset %d" %
564 (attr, before, after, offset))
565 self.assertEquals(expected_results, results)
568 if random.random() < 0.1 + (n < 5) * 0.05:
572 i = random.randrange(n)
573 user = self.create_user(i, n, suffix='-%s' %
576 if random.random() < 0.1 + (n > 50) * 0.02 and n:
577 index = random.randrange(n)
578 user = self.users.pop(index)
580 self.ldb.delete(user['dn'])
582 replaced = (user[attr], user['cn'])
583 if replaced in expected_order:
584 i = expected_order.index(replaced)
585 expected_order[i] = None
587 def test_server_vlv_with_cookie_while_changing(self):
588 """What happens if we modify items in the middle of the VLV?
590 The expected behaviour (as found on Windows) is the sort is
591 not repeated, but the changes in attributes are reflected.
593 attrs = [x for x in self.users[0].keys() if x not in
594 ('dn', 'objectclass', 'cn')]
596 n_users = len(self.users)
597 expected_order = [x.upper() for x in self.get_expected_order(attr)]
598 sort_control = "server_sort:1:0:%s" % attr
602 # First we'll fetch the whole list so we know the original
603 # sort order. This is necessary because we don't know how
604 # the server will order equivalent items. We are using the
606 half_n = n_users // 2
607 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
608 res = self.ldb.search(self.ou,
609 scope=ldb.SCOPE_ONELEVEL,
611 controls=[sort_control, vlv_search])
613 results = [x[attr][0].upper() for x in res]
614 #self.assertEquals(expected_order, results)
616 dn_order = [str(x['dn']) for x in res]
619 for before in range(0, 3):
620 for after in range(0, 3):
621 for offset in range(1 + before, n_users - after):
622 cookie = get_cookie(res.controls, len(self.users))
623 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
624 (before, after, offset, len(self.users),
627 res = self.ldb.search(self.ou,
628 scope=ldb.SCOPE_ONELEVEL,
630 controls=[sort_control,
633 dn_results = [str(x['dn']) for x in res]
634 dn_expected = dn_order[offset - before - 1:
637 self.assertEquals(dn_expected, dn_results)
639 results = [x[attr][0].upper() for x in res]
641 self.assertCorrectResults(results, values,
642 offset, before, after)
646 if (attr in self.locale_sorted_keys or
647 attr in self.binary_sorted_keys):
649 i2 = (i ^ 255) % n_users
654 if v2 in self.locale_sorted_keys:
656 cn1 = dn1.split(',', 1)[0][3:]
657 cn2 = dn2.split(',', 1)[0][3:]
662 m.dn = ldb.Dn(self.ldb, dn1)
663 m[attr] = ldb.MessageElement(v2,
664 ldb.FLAG_MOD_REPLACE,
669 def test_server_vlv_fractions_with_cookie(self):
670 """What happens when the count is set to an arbitrary number?
672 In that case the offset and the count form a fraction, and the
673 VLV should be centred at a point offset/count of the way
674 through. For example, if offset is 3 and count is 6, the VLV
675 should be looking around halfway. The actual algorithm is a
676 bit fiddlier than that, because of the one-basedness of VLV.
678 attrs = [x for x in self.users[0].keys() if x not in
679 ('dn', 'objectclass')]
681 n_users = len(self.users)
686 full_results, controls, sort_control = self.get_full_list(attr)
687 self.assertEqual(len(full_results), n_users)
688 for before in range(0, 2):
689 for after in range(0, 2):
690 for denominator in range(1, 20):
691 for offset in range(1, denominator + 3):
692 cookie = get_cookie(controls, len(self.users))
693 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
694 (before, after, offset,
698 res = self.ldb.search(self.ou,
699 scope=ldb.SCOPE_ONELEVEL,
701 controls=[sort_control,
703 except ldb.LdbError as e:
706 print("offset %d denominator %d raised error "
707 "expected error %s\n"
708 "(offset zero is illegal unless "
709 "content count is zero)" %
710 (offset, denominator, e))
713 results = [str(x[attr][0]).lower() for x in res]
716 denominator = n_users
719 elif denominator == 1:
720 # the offset can only be 1, but the 1/1 case
721 # means something special
723 real_offset = n_users
727 if offset > denominator:
730 int(round((n_users - 1) *
732 (denominator - 1.0)))
735 self.assertCorrectResults(results, full_results,
739 controls = res.controls
741 for c in list(controls):
743 if cstr.startswith('vlv_resp'):
744 bits = cstr.rsplit(':')
745 print("the answer is %s; we said %d" %
746 (bits[2], real_offset))
749 def test_server_vlv_no_cookie(self):
750 attrs = [x for x in self.users[0].keys() if x not in
751 ('dn', 'objectclass')]
754 expected_order = self.get_expected_order(attr)
755 sort_control = "server_sort:1:0:%s" % attr
756 for before in range(0, 5):
757 for after in range(0, 7):
758 for offset in range(1 + before, len(self.users) - after):
759 res = self.ldb.search(self.ou,
760 scope=ldb.SCOPE_ONELEVEL,
762 controls=[sort_control,
766 results = [x[attr][0] for x in res]
767 self.assertCorrectResults(results, expected_order,
768 offset, before, after)
770 def get_expected_order_showing_deleted(self, attr,
771 expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
773 scope=ldb.SCOPE_SUBTREE
775 """Fetch the whole list sorted on the attribute, using sort only,
776 searching in the entire tree, not just our OU. This is the
777 way to find deleted objects.
781 sort_control = "server_sort:1:0:%s" % attr
782 controls = [sort_control, "show_deleted:1"]
784 res = self.ldb.search(base,
786 expression=expression,
789 results = [x[attr][0] for x in res]
792 def add_deleted_users(self, n):
793 deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
796 for user in deleted_users:
797 self.delete_user(user)
799 def test_server_vlv_no_cookie_show_deleted(self):
800 """What do we see with the show_deleted control?"""
801 attrs = ['objectGUID',
810 # add some deleted users first, just in case there are none
811 self.add_deleted_users(6)
813 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
816 show_deleted_control = "show_deleted:1"
817 expected_order = self.get_expected_order_showing_deleted(attr,
819 n = len(expected_order)
820 sort_control = "server_sort:1:0:%s" % attr
821 for before in [3, 1, 0]:
823 # don't test every position, because there could be hundreds.
824 # jump back and forth instead
826 offset = random.randrange(max(1, before - 2),
827 min(n - after + 2, n))
828 res = self.ldb.search(self.base_dn,
829 expression=expression,
830 scope=ldb.SCOPE_SUBTREE,
832 controls=[sort_control,
833 show_deleted_control,
839 results = [x[attr][0] for x in res]
840 self.assertCorrectResults(results, expected_order,
841 offset, before, after)
843 def test_server_vlv_no_cookie_show_deleted_only(self):
844 """What do we see with the show_deleted control when we're not looking
845 at any non-deleted things"""
846 attrs = ['objectGUID',
853 # add some deleted users first, just in case there are none
854 self.add_deleted_users(4)
855 base = 'CN=Deleted Objects,%s' % self.base_dn
856 expression = "(cn=vlv-deleted*)"
858 show_deleted_control = "show_deleted:1"
859 expected_order = self.get_expected_order_showing_deleted(attr,
860 expression=expression,
862 scope=ldb.SCOPE_ONELEVEL)
863 print("searching for attr %s amongst %d deleted objects" %
864 (attr, len(expected_order)))
865 sort_control = "server_sort:1:0:%s" % attr
866 step = max(len(expected_order) // 10, 1)
867 for before in [3, 0]:
869 for offset in range(1 + before,
870 len(expected_order) - after,
872 res = self.ldb.search(base,
873 expression=expression,
874 scope=ldb.SCOPE_ONELEVEL,
876 controls=[sort_control,
877 show_deleted_control,
881 results = [x[attr][0] for x in res]
882 self.assertCorrectResults(results, expected_order,
883 offset, before, after)
885 def test_server_vlv_with_cookie_show_deleted(self):
886 """What do we see with the show_deleted control?"""
887 attrs = ['objectGUID',
895 self.add_deleted_users(6)
898 expected_order = self.get_expected_order(attr)
899 sort_control = "server_sort:1:0:%s" % attr
901 show_deleted_control = "show_deleted:1"
902 expected_order = self.get_expected_order_showing_deleted(attr)
903 n = len(expected_order)
904 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
905 for before in [3, 2, 1, 0]:
908 offset = random.randrange(max(1, before - 2),
909 min(n - after + 2, n))
911 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
914 cookie = get_cookie(res.controls, n)
915 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
916 (before, after, offset, n,
919 res = self.ldb.search(self.base_dn,
920 expression=expression,
921 scope=ldb.SCOPE_SUBTREE,
923 controls=[sort_control,
925 show_deleted_control])
927 results = [x[attr][0] for x in res]
929 self.assertCorrectResults(results, expected_order,
930 offset, before, after)
932 def test_server_vlv_gte_with_cookie(self):
933 attrs = [x for x in self.users[0].keys() if x not in
934 ('dn', 'objectclass')]
936 gte_order, expected_order, gte_map = \
937 self.get_gte_tests_and_order(attr)
938 # In case there is some order dependency, disorder tests
939 gte_tests = gte_order[:]
941 random.shuffle(gte_tests)
943 sort_control = "server_sort:1:0:%s" % attr
944 for before in [0, 1, 2, 4]:
945 for after in [0, 1, 3, 6]:
946 for gte in gte_tests:
948 cookie = get_cookie(res.controls, len(self.users))
951 vlv_search = encode_vlv_control(before=before,
956 res = self.ldb.search(self.ou,
957 scope=ldb.SCOPE_ONELEVEL,
959 controls=[sort_control,
962 results = [x[attr][0] for x in res]
963 offset = gte_map.get(gte, len(expected_order))
965 # here offset is 0-based
966 start = max(offset - before, 0)
967 end = offset + 1 + after
969 expected_results = expected_order[start: end]
971 self.assertEquals(expected_results, results)
973 def test_server_vlv_gte_no_cookie(self):
974 attrs = [x for x in self.users[0].keys() if x not in
975 ('dn', 'objectclass')]
978 gte_order, expected_order, gte_map = \
979 self.get_gte_tests_and_order(attr)
980 # In case there is some order dependency, disorder tests
981 gte_tests = gte_order[:]
983 random.shuffle(gte_tests)
985 sort_control = "server_sort:1:0:%s" % attr
986 for before in [0, 1, 3]:
988 for gte in gte_tests:
989 vlv_search = encode_vlv_control(before=before,
993 res = self.ldb.search(self.ou,
994 scope=ldb.SCOPE_ONELEVEL,
996 controls=[sort_control,
998 results = [x[attr][0] for x in res]
1000 # here offset is 0-based
1001 offset = gte_map.get(gte, len(expected_order))
1002 start = max(offset - before, 0)
1003 end = offset + after + 1
1004 expected_results = expected_order[start: end]
1006 if expected_results != results:
1007 middle = expected_order[len(expected_order) // 2]
1008 print(expected_results, results)
1010 print(expected_order)
1012 print("\nattr %s offset %d before %d "
1014 (attr, offset, before, after, gte))
1015 self.assertEquals(expected_results, results)
1017 def test_multiple_searches(self):
1018 """The maximum number of concurrent vlv searches per connection is
1019 currently set at 3. That means if you open 4 VLV searches the
1020 cookie on the first one should fail.
1022 # Windows has a limit of 10 VLVs where there are low numbers
1023 # of objects in each search.
1024 attrs = ([x for x in self.users[0].keys() if x not in
1025 ('dn', 'objectclass')] * 2)[:12]
1029 sort_control = "server_sort:1:0:%s" % attr
1031 res = self.ldb.search(self.ou,
1032 scope=ldb.SCOPE_ONELEVEL,
1034 controls=[sort_control,
1037 cookie = get_cookie(res.controls, len(self.users))
1038 vlv_cookies.append(cookie)
1041 # now this one should fail
1042 self.assertRaises(ldb.LdbError,
1045 scope=ldb.SCOPE_ONELEVEL,
1047 controls=[sort_control,
1048 "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1050 # and this one should succeed
1051 res = self.ldb.search(self.ou,
1052 scope=ldb.SCOPE_ONELEVEL,
1054 controls=[sort_control,
1055 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1057 # this one should fail because it is a new connection and
1058 # doesn't share cookies
1059 new_ldb = SamDB(host, credentials=creds,
1060 session_info=system_session(lp), lp=lp)
1062 self.assertRaises(ldb.LdbError,
1063 new_ldb.search, self.ou,
1064 scope=ldb.SCOPE_ONELEVEL,
1066 controls=[sort_control,
1067 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1069 # but now without the critical flag it just does no VLV.
1070 new_ldb.search(self.ou,
1071 scope=ldb.SCOPE_ONELEVEL,
1073 controls=[sort_control,
1074 "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1077 if "://" not in host:
1078 if os.path.isfile(host):
1079 host = "tdb://%s" % host
1081 host = "ldap://%s" % host
1084 TestProgram(module=__name__, opts=subunitopts)