29cc1718c78d4f147f091d390100de14f116ca70
[samba.git] / source4 / dsdb / tests / python / vlv.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import base64
9 import random
10 import re
11
12 sys.path.insert(0, "bin/python")
13 import samba
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 import ldb
20 from samba.samdb import SamDB
21 from samba.compat import get_bytes
22 from samba.compat import get_string
23
24 import time
25
26 parser = optparse.OptionParser("vlv.py [options] <host>")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
29 parser.add_option_group(options.VersionOptions(parser))
30 # use command line creds if available
31 credopts = options.CredentialsOptions(parser)
32 parser.add_option_group(credopts)
33 subunitopts = SubunitOptions(parser)
34 parser.add_option_group(subunitopts)
35
36 parser.add_option('--elements', type='int', default=20,
37                   help="use this many elements in the tests")
38
39 parser.add_option('--delete-in-setup', action='store_true',
40                   help="cleanup in next setup rather than teardown")
41
42 parser.add_option('--skip-attr-regex',
43                   help="ignore attributes matching this regex")
44
45 opts, args = parser.parse_args()
46
47 if len(args) < 1:
48     parser.print_usage()
49     sys.exit(1)
50
51 host = args[0]
52
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
55
56 N_ELEMENTS = opts.elements
57
58
59 class VlvTestException(Exception):
60     pass
61
62
63 def encode_vlv_control(critical=1,
64                        before=0, after=0,
65                        offset=None,
66                        gte=None,
67                        n=0, cookie=None):
68
69     s = "vlv:%d:%d:%d:" % (critical, before, after)
70
71     if offset is not None:
72         m = "%d:%d" % (offset, n)
73     elif b':' in gte or b'\x00' in gte:
74         gte = get_string(base64.b64encode(gte))
75         m = "base64>=%s" % gte
76     else:
77         m = ">=%s" % get_string(gte)
78
79     if cookie is None:
80         return s + m
81
82     return s + m + ':' + cookie
83
84
85 def get_cookie(controls, expected_n=None):
86     """Get the cookie, STILL base64 encoded, or raise ValueError."""
87     for c in list(controls):
88         cstr = str(c)
89         if cstr.startswith('vlv_resp'):
90             head, n, _, cookie = cstr.rsplit(':', 3)
91             if expected_n is not None and int(n) != expected_n:
92                 raise ValueError("Expected %s items, server said %s" %
93                                  (expected_n, n))
94             return cookie
95     raise ValueError("there is no VLV response")
96
97
98 class TestsWithUserOU(samba.tests.TestCase):
99
100     def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
101         name = "%s%d%s" % (prefix, i, suffix)
102         user = {
103             'cn': name,
104             "objectclass": "user",
105             'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
106             "roomNumber": "%sbc" % (n - i),
107             "carLicense": "后来经",
108             "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
109             "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
110             "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
111             "flags": str(i * (n - i)),
112             "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
113                                             ' 3z}'[i & 3],
114                                             '"@'[i & 1],),
115         }
116
117         # _user_broken_attrs tests are broken due to problems outside
118         # of VLV.
119         _user_broken_attrs = {
120             # Sort doesn't look past a NUL byte.
121             "photo": "\x00%d" % (n - i),
122             "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
123                                                              chr(i & 255), i),
124             "displayNamePrintable": "%d\x00%c" % (i, i & 255),
125             "adminDisplayName": "%d\x00b" % (n - i),
126             "title": "%d%sb" % (n - i, '\x00' * i),
127             "comment": "Favourite colour is %d" % (n % (i + 1)),
128
129             # Names that vary only in case. Windows returns
130             # equivalent addresses in the order they were put
131             # in ('a st', 'A st',...).
132             "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
133         }
134
135         if attrs is not None:
136             user.update(attrs)
137
138         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
139
140         if opts.skip_attr_regex:
141             match = re.compile(opts.skip_attr_regex).search
142             for k in user.keys():
143                 if match(k):
144                     del user[k]
145
146         self.users.append(user)
147         self.ldb.add(user)
148         return user
149
150     def setUp(self):
151         super(TestsWithUserOU, self).setUp()
152         self.ldb = SamDB(host, credentials=creds,
153                          session_info=system_session(lp), lp=lp)
154
155         self.base_dn = self.ldb.domain_dn()
156         self.tree_dn = "ou=vlvtesttree,%s" % self.base_dn
157         self.ou = "ou=vlvou,%s" % self.tree_dn
158         if opts.delete_in_setup:
159             try:
160                 self.ldb.delete(self.tree_dn, ['tree_delete:1'])
161             except ldb.LdbError as e:
162                 print("tried deleting %s, got error %s" % (self.tree_dn, e))
163         self.ldb.add({
164             "dn": self.tree_dn,
165             "objectclass": "organizationalUnit"})
166         self.ldb.add({
167             "dn": self.ou,
168             "objectclass": "organizationalUnit"})
169
170         self.users = []
171         for i in range(N_ELEMENTS):
172             self.create_user(i, N_ELEMENTS)
173
174         attrs = self.users[0].keys()
175         self.binary_sorted_keys = ['audio',
176                                    'photo',
177                                    "msTSExpireDate4",
178                                    'serialNumber',
179                                    "displayNamePrintable"]
180
181         self.numeric_sorted_keys = ['flags',
182                                     'accountExpires']
183
184         self.timestamp_keys = ['msTSExpireDate4']
185
186         self.int64_keys = set(['accountExpires'])
187
188         self.locale_sorted_keys = [x for x in attrs if
189                                    x not in (self.binary_sorted_keys +
190                                              self.numeric_sorted_keys)]
191
192         # don't try spaces, etc in cn
193         self.delicate_keys = ['cn']
194
195     def tearDown(self):
196         super(TestsWithUserOU, self).tearDown()
197         if not opts.delete_in_setup:
198             self.ldb.delete(self.tree_dn, ['tree_delete:1'])
199
200
201 class VLVTests(TestsWithUserOU):
202
203     def get_full_list(self, attr, include_cn=False):
204         """Fetch the whole list sorted on the attribute, using the VLV.
205         This way you get a VLV cookie."""
206         n_users = len(self.users)
207         sort_control = "server_sort:1:0:%s" % attr
208         half_n = n_users // 2
209         vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
210         attrs = [attr]
211         if include_cn:
212             attrs.append('cn')
213         res = self.ldb.search(self.ou,
214                               scope=ldb.SCOPE_ONELEVEL,
215                               attrs=attrs,
216                               controls=[sort_control,
217                                         vlv_search])
218         if include_cn:
219             full_results = [(str(x[attr][0]), str(x['cn'][0])) for x in res]
220         else:
221             full_results = [str(x[attr][0]).lower() for x in res]
222         controls = res.controls
223         return full_results, controls, sort_control
224
225     def get_expected_order(self, attr, expression=None):
226         """Fetch the whole list sorted on the attribute, using sort only."""
227         sort_control = "server_sort:1:0:%s" % attr
228         res = self.ldb.search(self.ou,
229                               scope=ldb.SCOPE_ONELEVEL,
230                               expression=expression,
231                               attrs=[attr],
232                               controls=[sort_control])
233         results = [x[attr][0] for x in res]
234         return results
235
236     def delete_user(self, user):
237         self.ldb.delete(user['dn'])
238         del self.users[self.users.index(user)]
239
240     def get_gte_tests_and_order(self, attr, expression=None):
241         expected_order = self.get_expected_order(attr, expression=expression)
242         gte_users = []
243         if attr in self.delicate_keys:
244             gte_keys = [
245                 '3',
246                 'abc',
247                 '¹',
248                 'ŋđ¼³ŧ“«đð',
249                 '桑巴',
250             ]
251         elif attr in self.timestamp_keys:
252             gte_keys = [
253                 '18560101010000.0Z',
254                 '19140103010000.0Z',
255                 '19560101010010.0Z',
256                 '19700101000000.0Z',
257                 '19991231211234.3Z',
258                 '20061111211234.0Z',
259                 '20390901041234.0Z',
260                 '25560101010000.0Z',
261             ]
262         elif attr not in self.numeric_sorted_keys:
263             gte_keys = [
264                 '3',
265                 'abc',
266                 ' ',
267                 '!@#!@#!',
268                 'kōkako',
269                 '¹',
270                 'ŋđ¼³ŧ“«đð',
271                 '\n\t\t',
272                 '桑巴',
273                 'zzzz',
274             ]
275             if expected_order:
276                 gte_keys.append(expected_order[len(expected_order) // 2] + b' tail')
277
278         else:
279             # "numeric" means positive integers
280             # doesn't work with -1, 3.14, ' 3', '9' * 20
281             gte_keys = ['3',
282                         '1' * 10,
283                         '1',
284                         '9' * 7,
285                         '0']
286
287             if attr in self.int64_keys:
288                 gte_keys += ['3' * 12, '71' * 8]
289
290         for i, x in enumerate(gte_keys):
291             user = self.create_user(i, N_ELEMENTS,
292                                     prefix='gte',
293                                     attrs={attr: x})
294             gte_users.append(user)
295
296         gte_order = self.get_expected_order(attr)
297         for user in gte_users:
298             self.delete_user(user)
299
300         # for sanity's sake
301         expected_order_2 = self.get_expected_order(attr, expression=expression)
302         self.assertEqual(expected_order, expected_order_2)
303
304         # Map gte tests to indexes in expected order. This will break
305         # if gte_order and expected_order are differently ordered (as
306         # it should).
307         gte_map = {}
308
309         # index to the first one with each value
310         index_map = {}
311         for i, k in enumerate(expected_order):
312             if k not in index_map:
313                 index_map[k] = i
314
315         keys = []
316         for k in gte_order:
317             if k in index_map:
318                 i = index_map[k]
319                 gte_map[k] = i
320                 for k in keys:
321                     gte_map[k] = i
322                 keys = []
323             else:
324                 keys.append(k)
325
326         for k in keys:
327             gte_map[k] = len(expected_order)
328
329         if False:
330             print("gte_map:")
331             for k in gte_order:
332                 print("   %10s => %10s" % (k, gte_map[k]))
333
334         return gte_order, expected_order, gte_map
335
336     def assertCorrectResults(self, results, expected_order,
337                              offset, before, after):
338         """A helper to calculate offsets correctly and say as much as possible
339         when something goes wrong."""
340
341         start = max(offset - before - 1, 0)
342         end = offset + after
343         expected_results = expected_order[start: end]
344
345         # if it is a tuple with the cn, drop the cn
346         if expected_results and isinstance(expected_results[0], tuple):
347             expected_results = [x[0] for x in expected_results]
348
349         if expected_results == results:
350             return
351
352         if expected_order is not None:
353             print("expected order: %s" % expected_order[:20])
354             if len(expected_order) > 20:
355                 print("... and %d more not shown" % (len(expected_order) - 20))
356
357         print("offset %d before %d after %d" % (offset, before, after))
358         print("start %d end %d" % (start, end))
359         print("expected: %s" % expected_results)
360         print("got     : %s" % results)
361         self.assertEquals(expected_results, results)
362
363     def test_server_vlv_with_cookie(self):
364         attrs = [x for x in self.users[0].keys() if x not in
365                  ('dn', 'objectclass')]
366         for attr in attrs:
367             expected_order = self.get_expected_order(attr)
368             sort_control = "server_sort:1:0:%s" % attr
369             res = None
370             n = len(self.users)
371             for before in [10, 0, 3, 1, 4, 5, 2]:
372                 for after in [0, 3, 1, 4, 5, 2, 7]:
373                     for offset in range(max(1, before - 2),
374                                         min(n - after + 2, n)):
375                         if res is None:
376                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
377                                                                offset)
378                         else:
379                             cookie = get_cookie(res.controls, n)
380                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
381                                           (before, after, offset, n,
382                                            cookie))
383
384                         res = self.ldb.search(self.ou,
385                                               scope=ldb.SCOPE_ONELEVEL,
386                                               attrs=[attr],
387                                               controls=[sort_control,
388                                                         vlv_search])
389
390                         results = [x[attr][0] for x in res]
391
392                         self.assertCorrectResults(results, expected_order,
393                                                   offset, before, after)
394
395     def run_index_tests_with_expressions(self, expressions):
396         # Here we don't test every before/after combination.
397         attrs = [x for x in self.users[0].keys() if x not in
398                  ('dn', 'objectclass')]
399         for attr in attrs:
400             for expression in expressions:
401                 expected_order = self.get_expected_order(attr, expression)
402                 sort_control = "server_sort:1:0:%s" % attr
403                 res = None
404                 n = len(expected_order)
405                 for before in range(0, 11):
406                     after = before
407                     for offset in range(max(1, before - 2),
408                                         min(n - after + 2, n)):
409                         if res is None:
410                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
411                                                                offset)
412                         else:
413                             cookie = get_cookie(res.controls)
414                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
415                                           (before, after, offset, n,
416                                            cookie))
417
418                         res = self.ldb.search(self.ou,
419                                               expression=expression,
420                                               scope=ldb.SCOPE_ONELEVEL,
421                                               attrs=[attr],
422                                               controls=[sort_control,
423                                                         vlv_search])
424
425                         results = [x[attr][0] for x in res]
426
427                         self.assertCorrectResults(results, expected_order,
428                                                   offset, before, after)
429
430     def test_server_vlv_with_expression(self):
431         """What happens when we run the VLV with an expression?"""
432         expressions = ["(objectClass=*)",
433                        "(cn=%s)" % self.users[-1]['cn'],
434                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
435                        ]
436         self.run_index_tests_with_expressions(expressions)
437
438     def test_server_vlv_with_failing_expression(self):
439         """What happens when we run the VLV on an expression that matches
440         nothing?"""
441         expressions = ["(samaccountname=testferf)",
442                        "(cn=hefalump)",
443                        ]
444         self.run_index_tests_with_expressions(expressions)
445
446     def run_gte_tests_with_expressions(self, expressions):
447         # Here we don't test every before/after combination.
448         attrs = [x for x in self.users[0].keys() if x not in
449                  ('dn', 'objectclass')]
450         for expression in expressions:
451             for attr in attrs:
452                 gte_order, expected_order, gte_map = \
453                     self.get_gte_tests_and_order(attr, expression)
454                 # In case there is some order dependency, disorder tests
455                 gte_tests = gte_order[:]
456                 random.seed(2)
457                 random.shuffle(gte_tests)
458                 res = None
459                 sort_control = "server_sort:1:0:%s" % attr
460
461                 expected_order = self.get_expected_order(attr, expression)
462                 sort_control = "server_sort:1:0:%s" % attr
463                 res = None
464                 for before in range(0, 11):
465                     after = before
466                     for gte in gte_tests:
467                         if res is not None:
468                             cookie = get_cookie(res.controls)
469                         else:
470                             cookie = None
471                         vlv_search = encode_vlv_control(before=before,
472                                                         after=after,
473                                                         gte=get_bytes(gte),
474                                                         cookie=cookie)
475
476                         res = self.ldb.search(self.ou,
477                                               scope=ldb.SCOPE_ONELEVEL,
478                                               expression=expression,
479                                               attrs=[attr],
480                                               controls=[sort_control,
481                                                         vlv_search])
482
483                         results = [x[attr][0] for x in res]
484                         offset = gte_map.get(gte, len(expected_order))
485
486                         # here offset is 0-based
487                         start = max(offset - before, 0)
488                         end = offset + 1 + after
489
490                         expected_results = expected_order[start: end]
491
492                         self.assertEquals(expected_results, results)
493
494     def test_vlv_gte_with_expression(self):
495         """What happens when we run the VLV with an expression?"""
496         expressions = ["(objectClass=*)",
497                        "(cn=%s)" % self.users[-1]['cn'],
498                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
499                        ]
500         self.run_gte_tests_with_expressions(expressions)
501
502     def test_vlv_gte_with_failing_expression(self):
503         """What happens when we run the VLV on an expression that matches
504         nothing?"""
505         expressions = ["(samaccountname=testferf)",
506                        "(cn=hefalump)",
507                        ]
508         self.run_gte_tests_with_expressions(expressions)
509
510     def test_server_vlv_with_cookie_while_adding_and_deleting(self):
511         """What happens if we add or remove items in the middle of the VLV?
512
513         Nothing. The search and the sort is not repeated, and we only
514         deal with the objects originally found.
515         """
516         attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
517                           ('dn', 'objectclass')]
518         user_number = 0
519         iteration = 0
520         for attr in attrs:
521             full_results, controls, sort_control = \
522                             self.get_full_list(attr, True)
523             original_n = len(self.users)
524
525             expected_order = full_results
526             random.seed(1)
527
528             for before in list(range(0, 3)) + [6, 11, 19]:
529                 for after in list(range(0, 3)) + [6, 11, 19]:
530                     start = max(before - 1, 1)
531                     end = max(start + 4, original_n - after + 2)
532                     for offset in range(start, end):
533                         # if iteration > 2076:
534                         #    return
535                         cookie = get_cookie(controls, original_n)
536                         vlv_search = encode_vlv_control(before=before,
537                                                         after=after,
538                                                         offset=offset,
539                                                         n=original_n,
540                                                         cookie=cookie)
541
542                         iteration += 1
543                         res = self.ldb.search(self.ou,
544                                               scope=ldb.SCOPE_ONELEVEL,
545                                               attrs=[attr],
546                                               controls=[sort_control,
547                                                         vlv_search])
548
549                         controls = res.controls
550                         results = [x[attr][0] for x in res]
551                         real_offset = max(1, min(offset, len(expected_order)))
552
553                         expected_results = []
554                         skipped = 0
555                         begin_offset = max(real_offset - before - 1, 0)
556                         real_before = min(before, real_offset - 1)
557                         real_after = min(after,
558                                          len(expected_order) - real_offset)
559
560                         for x in expected_order[begin_offset:]:
561                             if x is not None:
562                                 expected_results.append(get_bytes(x[0]))
563                                 if (len(expected_results) ==
564                                     real_before + real_after + 1):
565                                     break
566                             else:
567                                 skipped += 1
568
569                         if expected_results != results:
570                             print("attr %s before %d after %d offset %d" %
571                                   (attr, before, after, offset))
572                         self.assertEquals(expected_results, results)
573
574                         n = len(self.users)
575                         if random.random() < 0.1 + (n < 5) * 0.05:
576                             if n == 0:
577                                 i = 0
578                             else:
579                                 i = random.randrange(n)
580                             user = self.create_user(i, n, suffix='-%s' %
581                                                     user_number)
582                             user_number += 1
583                         if random.random() < 0.1  + (n > 50) * 0.02 and n:
584                             index = random.randrange(n)
585                             user = self.users.pop(index)
586
587                             self.ldb.delete(user['dn'])
588
589                             replaced = (user[attr], user['cn'])
590                             if replaced in expected_order:
591                                 i = expected_order.index(replaced)
592                                 expected_order[i] = None
593
594     def test_server_vlv_with_cookie_while_changing(self):
595         """What happens if we modify items in the middle of the VLV?
596
597         The expected behaviour (as found on Windows) is the sort is
598         not repeated, but the changes in attributes are reflected.
599         """
600         attrs = [x for x in self.users[0].keys() if x not in
601                  ('dn', 'objectclass', 'cn')]
602         for attr in attrs:
603             n_users = len(self.users)
604             expected_order = [x.upper() for x in self.get_expected_order(attr)]
605             sort_control = "server_sort:1:0:%s" % attr
606             res = None
607             i = 0
608
609             # First we'll fetch the whole list so we know the original
610             # sort order. This is necessary because we don't know how
611             # the server will order equivalent items. We are using the
612             # dn as a key.
613             half_n = n_users // 2
614             vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
615             res = self.ldb.search(self.ou,
616                                   scope=ldb.SCOPE_ONELEVEL,
617                                   attrs=['dn', attr],
618                                   controls=[sort_control, vlv_search])
619
620             results = [x[attr][0].upper() for x in res]
621             #self.assertEquals(expected_order, results)
622
623             dn_order = [str(x['dn']) for x in res]
624             values = results[:]
625
626             for before in range(0, 3):
627                 for after in range(0, 3):
628                     for offset in range(1 + before, n_users - after):
629                         cookie = get_cookie(res.controls, len(self.users))
630                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
631                                       (before, after, offset, len(self.users),
632                                        cookie))
633
634                         res = self.ldb.search(self.ou,
635                                               scope=ldb.SCOPE_ONELEVEL,
636                                               attrs=['dn', attr],
637                                               controls=[sort_control,
638                                                         vlv_search])
639
640                         dn_results = [str(x['dn']) for x in res]
641                         dn_expected = dn_order[offset - before - 1:
642                                                offset + after]
643
644                         self.assertEquals(dn_expected, dn_results)
645
646                         results = [x[attr][0].upper() for x in res]
647
648                         self.assertCorrectResults(results, values,
649                                                   offset, before, after)
650
651                         i += 1
652                         if i % 3 == 2:
653                             if (attr in self.locale_sorted_keys or
654                                 attr in self.binary_sorted_keys):
655                                 i1 = i % n_users
656                                 i2 = (i ^ 255) % n_users
657                                 dn1 = dn_order[i1]
658                                 dn2 = dn_order[i2]
659                                 v2 = values[i2]
660
661                                 if v2 in self.locale_sorted_keys:
662                                     v2 += '-%d' % i
663                                 cn1 = dn1.split(',', 1)[0][3:]
664                                 cn2 = dn2.split(',', 1)[0][3:]
665
666                                 values[i1] = v2
667
668                                 m = ldb.Message()
669                                 m.dn = ldb.Dn(self.ldb, dn1)
670                                 m[attr] = ldb.MessageElement(v2,
671                                                              ldb.FLAG_MOD_REPLACE,
672                                                              attr)
673
674                                 self.ldb.modify(m)
675
676     def test_server_vlv_fractions_with_cookie(self):
677         """What happens when the count is set to an arbitrary number?
678
679         In that case the offset and the count form a fraction, and the
680         VLV should be centred at a point offset/count of the way
681         through. For example, if offset is 3 and count is 6, the VLV
682         should be looking around halfway. The actual algorithm is a
683         bit fiddlier than that, because of the one-basedness of VLV.
684         """
685         attrs = [x for x in self.users[0].keys() if x not in
686                  ('dn', 'objectclass')]
687
688         n_users = len(self.users)
689
690         random.seed(4)
691
692         for attr in attrs:
693             full_results, controls, sort_control = self.get_full_list(attr)
694             self.assertEqual(len(full_results), n_users)
695             for before in range(0, 2):
696                 for after in range(0, 2):
697                     for denominator in range(1, 20):
698                         for offset in range(1, denominator + 3):
699                             cookie = get_cookie(controls, len(self.users))
700                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
701                                           (before, after, offset,
702                                            denominator,
703                                            cookie))
704                             try:
705                                 res = self.ldb.search(self.ou,
706                                                       scope=ldb.SCOPE_ONELEVEL,
707                                                       attrs=[attr],
708                                                       controls=[sort_control,
709                                                                 vlv_search])
710                             except ldb.LdbError as e:
711                                 if offset != 0:
712                                     raise
713                                 print("offset %d denominator %d raised error "
714                                       "expected error %s\n"
715                                       "(offset zero is illegal unless "
716                                       "content count is zero)" %
717                                       (offset, denominator, e))
718                                 continue
719
720                             results = [str(x[attr][0]).lower() for x in res]
721
722                             if denominator == 0:
723                                 denominator = n_users
724                                 if offset == 0:
725                                     offset = denominator
726                             elif denominator == 1:
727                                 # the offset can only be 1, but the 1/1 case
728                                 # means something special
729                                 if offset == 1:
730                                     real_offset = n_users
731                                 else:
732                                     real_offset = 1
733                             else:
734                                 if offset > denominator:
735                                     offset = denominator
736                                 real_offset = (1 +
737                                                int(round((n_users - 1) *
738                                                          (offset - 1) /
739                                                          (denominator - 1.0)))
740                                                )
741
742                             self.assertCorrectResults(results, full_results,
743                                                       real_offset, before,
744                                                       after)
745
746                             controls = res.controls
747                             if False:
748                                 for c in list(controls):
749                                     cstr = str(c)
750                                     if cstr.startswith('vlv_resp'):
751                                         bits = cstr.rsplit(':')
752                                         print("the answer is %s; we said %d" %
753                                               (bits[2], real_offset))
754                                         break
755
756     def test_server_vlv_no_cookie(self):
757         attrs = [x for x in self.users[0].keys() if x not in
758                  ('dn', 'objectclass')]
759
760         for attr in attrs:
761             expected_order = self.get_expected_order(attr)
762             sort_control = "server_sort:1:0:%s" % attr
763             for before in range(0, 5):
764                 for after in range(0, 7):
765                     for offset in range(1 + before, len(self.users) - after):
766                         res = self.ldb.search(self.ou,
767                                               scope=ldb.SCOPE_ONELEVEL,
768                                               attrs=[attr],
769                                               controls=[sort_control,
770                                                         "vlv:1:%d:%d:%d:0" %
771                                                         (before, after,
772                                                          offset)])
773                         results = [x[attr][0] for x in res]
774                         self.assertCorrectResults(results, expected_order,
775                                                   offset, before, after)
776
777     def get_expected_order_showing_deleted(self, attr,
778                                            expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
779                                            base=None,
780                                            scope=ldb.SCOPE_SUBTREE
781                                            ):
782         """Fetch the whole list sorted on the attribute, using sort only,
783         searching in the entire tree, not just our OU. This is the
784         way to find deleted objects.
785         """
786         if base is None:
787             base = self.base_dn
788         sort_control = "server_sort:1:0:%s" % attr
789         controls = [sort_control, "show_deleted:1"]
790
791         res = self.ldb.search(base,
792                               scope=scope,
793                               expression=expression,
794                               attrs=[attr],
795                               controls=controls)
796         results = [x[attr][0] for x in res]
797         return results
798
799     def add_deleted_users(self, n):
800         deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
801                          for i in range(n)]
802
803         for user in deleted_users:
804             self.delete_user(user)
805
806     def test_server_vlv_no_cookie_show_deleted(self):
807         """What do we see with the show_deleted control?"""
808         attrs = ['objectGUID',
809                  'cn',
810                  'sAMAccountName',
811                  'objectSid',
812                  'name',
813                  'whenChanged',
814                  'usnChanged'
815                  ]
816
817         # add some deleted users first, just in case there are none
818         self.add_deleted_users(6)
819         random.seed(22)
820         expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
821
822         for attr in attrs:
823             show_deleted_control = "show_deleted:1"
824             expected_order = self.get_expected_order_showing_deleted(attr,
825                                                                      expression)
826             n = len(expected_order)
827             sort_control = "server_sort:1:0:%s" % attr
828             for before in [3, 1, 0]:
829                 for after in [0, 2]:
830                     # don't test every position, because there could be hundreds.
831                     # jump back and forth instead
832                     for i in range(20):
833                         offset = random.randrange(max(1, before - 2),
834                                                   min(n - after + 2, n))
835                         res = self.ldb.search(self.base_dn,
836                                               expression=expression,
837                                               scope=ldb.SCOPE_SUBTREE,
838                                               attrs=[attr],
839                                               controls=[sort_control,
840                                                         show_deleted_control,
841                                                         "vlv:1:%d:%d:%d:0" %
842                                                         (before, after,
843                                                          offset)
844                                                         ]
845                                               )
846                         results = [x[attr][0] for x in res]
847                         self.assertCorrectResults(results, expected_order,
848                                                   offset, before, after)
849
850     def test_server_vlv_no_cookie_show_deleted_only(self):
851         """What do we see with the show_deleted control when we're not looking
852         at any non-deleted things"""
853         attrs = ['objectGUID',
854                  'cn',
855                  'sAMAccountName',
856                  'objectSid',
857                  'whenChanged',
858                  ]
859
860         # add some deleted users first, just in case there are none
861         self.add_deleted_users(4)
862         base = 'CN=Deleted Objects,%s' % self.base_dn
863         expression = "(cn=vlv-deleted*)"
864         for attr in attrs:
865             show_deleted_control = "show_deleted:1"
866             expected_order = self.get_expected_order_showing_deleted(attr,
867                                                                      expression=expression,
868                                                                      base=base,
869                                                                      scope=ldb.SCOPE_ONELEVEL)
870             print("searching for attr %s amongst %d deleted objects" %
871                   (attr, len(expected_order)))
872             sort_control = "server_sort:1:0:%s" % attr
873             step = max(len(expected_order) // 10, 1)
874             for before in [3, 0]:
875                 for after in [0, 2]:
876                     for offset in range(1 + before,
877                                         len(expected_order) - after,
878                                         step):
879                         res = self.ldb.search(base,
880                                               expression=expression,
881                                               scope=ldb.SCOPE_ONELEVEL,
882                                               attrs=[attr],
883                                               controls=[sort_control,
884                                                         show_deleted_control,
885                                                         "vlv:1:%d:%d:%d:0" %
886                                                         (before, after,
887                                                          offset)])
888                         results = [x[attr][0] for x in res]
889                         self.assertCorrectResults(results, expected_order,
890                                                   offset, before, after)
891
892     def test_server_vlv_with_cookie_show_deleted(self):
893         """What do we see with the show_deleted control?"""
894         attrs = ['objectGUID',
895                  'cn',
896                  'sAMAccountName',
897                  'objectSid',
898                  'name',
899                  'whenChanged',
900                  'usnChanged'
901                  ]
902         self.add_deleted_users(6)
903         random.seed(23)
904         for attr in attrs:
905             expected_order = self.get_expected_order(attr)
906             sort_control = "server_sort:1:0:%s" % attr
907             res = None
908             show_deleted_control = "show_deleted:1"
909             expected_order = self.get_expected_order_showing_deleted(attr)
910             n = len(expected_order)
911             expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
912             for before in [3, 2, 1, 0]:
913                 after = before
914                 for i in range(20):
915                     offset = random.randrange(max(1, before - 2),
916                                               min(n - after + 2, n))
917                     if res is None:
918                         vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
919                                                            offset)
920                     else:
921                         cookie = get_cookie(res.controls, n)
922                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
923                                       (before, after, offset, n,
924                                        cookie))
925
926                     res = self.ldb.search(self.base_dn,
927                                           expression=expression,
928                                           scope=ldb.SCOPE_SUBTREE,
929                                           attrs=[attr],
930                                           controls=[sort_control,
931                                                     vlv_search,
932                                                     show_deleted_control])
933
934                     results = [x[attr][0] for x in res]
935
936                     self.assertCorrectResults(results, expected_order,
937                                               offset, before, after)
938
939     def test_server_vlv_gte_with_cookie(self):
940         attrs = [x for x in self.users[0].keys() if x not in
941                  ('dn', 'objectclass')]
942         for attr in attrs:
943             gte_order, expected_order, gte_map = \
944                                         self.get_gte_tests_and_order(attr)
945             # In case there is some order dependency, disorder tests
946             gte_tests = gte_order[:]
947             random.seed(1)
948             random.shuffle(gte_tests)
949             res = None
950             sort_control = "server_sort:1:0:%s" % attr
951             for before in [0, 1, 2, 4]:
952                 for after in [0, 1, 3, 6]:
953                     for gte in gte_tests:
954                         if res is not None:
955                             cookie = get_cookie(res.controls, len(self.users))
956                         else:
957                             cookie = None
958                         vlv_search = encode_vlv_control(before=before,
959                                                         after=after,
960                                                         gte=get_bytes(gte),
961                                                         cookie=cookie)
962
963                         res = self.ldb.search(self.ou,
964                                               scope=ldb.SCOPE_ONELEVEL,
965                                               attrs=[attr],
966                                               controls=[sort_control,
967                                                         vlv_search])
968
969                         results = [x[attr][0] for x in res]
970                         offset = gte_map.get(gte, len(expected_order))
971
972                         # here offset is 0-based
973                         start = max(offset - before, 0)
974                         end = offset + 1 + after
975
976                         expected_results = expected_order[start: end]
977
978                         self.assertEquals(expected_results, results)
979
980     def test_server_vlv_gte_no_cookie(self):
981         attrs = [x for x in self.users[0].keys() if x not in
982                  ('dn', 'objectclass')]
983         iteration = 0
984         for attr in attrs:
985             gte_order, expected_order, gte_map = \
986                                         self.get_gte_tests_and_order(attr)
987             # In case there is some order dependency, disorder tests
988             gte_tests = gte_order[:]
989             random.seed(1)
990             random.shuffle(gte_tests)
991
992             sort_control = "server_sort:1:0:%s" % attr
993             for before in [0, 1, 3]:
994                 for after in [0, 4]:
995                     for gte in gte_tests:
996                         vlv_search = encode_vlv_control(before=before,
997                                                         after=after,
998                                                         gte=get_bytes(gte))
999
1000                         res = self.ldb.search(self.ou,
1001                                               scope=ldb.SCOPE_ONELEVEL,
1002                                               attrs=[attr],
1003                                               controls=[sort_control,
1004                                                         vlv_search])
1005                         results = [x[attr][0] for x in res]
1006
1007                         # here offset is 0-based
1008                         offset = gte_map.get(gte, len(expected_order))
1009                         start = max(offset - before, 0)
1010                         end = offset + after + 1
1011                         expected_results = expected_order[start: end]
1012                         iteration += 1
1013                         if expected_results != results:
1014                             middle = expected_order[len(expected_order) // 2]
1015                             print(expected_results, results)
1016                             print(middle)
1017                             print(expected_order)
1018                             print()
1019                             print("\nattr %s offset %d before %d "
1020                                   "after %d gte %s" %
1021                                   (attr, offset, before, after, gte))
1022                         self.assertEquals(expected_results, results)
1023
1024     def test_multiple_searches(self):
1025         """The maximum number of concurrent vlv searches per connection is
1026         currently set at 3. That means if you open 4 VLV searches the
1027         cookie on the first one should fail.
1028         """
1029         # Windows has a limit of 10 VLVs where there are low numbers
1030         # of objects in each search.
1031         attrs = ([x for x in self.users[0].keys() if x not in
1032                   ('dn', 'objectclass')] * 2)[:12]
1033
1034         vlv_cookies = []
1035         for attr in attrs:
1036             sort_control = "server_sort:1:0:%s" % attr
1037
1038             res = self.ldb.search(self.ou,
1039                                   scope=ldb.SCOPE_ONELEVEL,
1040                                   attrs=[attr],
1041                                   controls=[sort_control,
1042                                             "vlv:1:1:1:1:0"])
1043
1044             cookie = get_cookie(res.controls, len(self.users))
1045             vlv_cookies.append(cookie)
1046             time.sleep(0.2)
1047
1048         # now this one should fail
1049         self.assertRaises(ldb.LdbError,
1050                           self.ldb.search,
1051                           self.ou,
1052                           scope=ldb.SCOPE_ONELEVEL,
1053                           attrs=[attr],
1054                           controls=[sort_control,
1055                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1056
1057         # and this one should succeed
1058         res = self.ldb.search(self.ou,
1059                               scope=ldb.SCOPE_ONELEVEL,
1060                               attrs=[attr],
1061                               controls=[sort_control,
1062                                         "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1063
1064         # this one should fail because it is a new connection and
1065         # doesn't share cookies
1066         new_ldb = SamDB(host, credentials=creds,
1067                         session_info=system_session(lp), lp=lp)
1068
1069         self.assertRaises(ldb.LdbError,
1070                           new_ldb.search, self.ou,
1071                           scope=ldb.SCOPE_ONELEVEL,
1072                           attrs=[attr],
1073                           controls=[sort_control,
1074                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1075
1076         # but now without the critical flag it just does no VLV.
1077         new_ldb.search(self.ou,
1078                        scope=ldb.SCOPE_ONELEVEL,
1079                        attrs=[attr],
1080                        controls=[sort_control,
1081                                  "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1082
1083     # Run a vlv search and return important fields of the response control
1084     def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
1085         sort_ctrl = "server_sort:1:0:%s" % attr
1086         ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
1087         if cookie:
1088             ctrl += ":" + cookie
1089
1090         res = self.ldb.search(self.ou,
1091                               expression=expr,
1092                               scope=ldb.SCOPE_ONELEVEL,
1093                               attrs=[attr],
1094                               controls=[ctrl, sort_ctrl])
1095         results = [str(x[attr][0]) for x in res]
1096
1097         ctrls = [str(c) for c in res.controls if
1098                  str(c).startswith('vlv')]
1099         self.assertEqual(len(ctrls), 1)
1100
1101         spl = ctrls[0].rsplit(':')
1102         cookie = ""
1103         if len(spl) == 6:
1104             cookie = spl[-1]
1105
1106         return results, cookie
1107
1108     def test_vlv_modify_during_view(self):
1109         attr = 'roomNumber'
1110         expr = "(objectclass=user)"
1111
1112         # Start new search
1113         full_results, cookie = self.vlv_search(attr, expr,
1114                                                after_count=len(self.users))
1115
1116         # Edit a user
1117         edit_index = len(self.users)//2
1118         edit_attr = full_results[edit_index]
1119         users_with_attr = [u for u in self.users if u[attr] == edit_attr]
1120         self.assertEqual(len(users_with_attr), 1)
1121         edit_user = users_with_attr[0]
1122
1123         # Put z at the front of the val so it comes last in ordering
1124         edit_val = "z_" + edit_user[attr]
1125
1126         m = ldb.Message()
1127         m.dn = ldb.Dn(self.ldb, edit_user['dn'])
1128         m[attr] = ldb.MessageElement(edit_val, ldb.FLAG_MOD_REPLACE, attr)
1129         self.ldb.modify(m)
1130
1131         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1132                                           after_count=len(self.users))
1133
1134         # Make expected_results by copying and editing full_results
1135         expected_results = full_results[:]
1136         expected_results[edit_index] = edit_val
1137         self.assertEqual(results, expected_results)
1138
1139     # Test changing the search expression in a request on an initialised view
1140     # Expected failure on samba, passes on windows
1141     def test_vlv_change_search_expr(self):
1142         attr = 'roomNumber'
1143         expr = "(objectclass=user)"
1144
1145         # Start new search
1146         full_results, cookie = self.vlv_search(attr, expr,
1147                                                after_count=len(self.users))
1148
1149         middle_index = len(full_results)//2
1150         # Search that excludes the old value but includes the new one
1151         expr = "%s>=%s" % (attr, full_results[middle_index])
1152         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1153                                           after_count=len(self.users))
1154         self.assertEqual(results, full_results[middle_index:])
1155
1156     # Check you can't add a value to a vlv view
1157     def test_vlv_add_during_view(self):
1158         attr = 'roomNumber'
1159         expr = "(objectclass=user)"
1160
1161         # Start new search
1162         full_results, cookie = self.vlv_search(attr, expr,
1163                                                after_count=len(self.users))
1164
1165         # Add a user at the end of the sort order
1166         add_val = "z_addedval"
1167         user = {'cn': add_val, "objectclass": "user", attr: add_val}
1168         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1169         self.ldb.add(user)
1170
1171         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1172                                           after_count=len(self.users)+1)
1173         self.assertEqual(results, full_results)
1174
1175     def test_vlv_delete_during_view(self):
1176         attr = 'roomNumber'
1177         expr = "(objectclass=user)"
1178
1179         # Start new search
1180         full_results, cookie = self.vlv_search(attr, expr,
1181                                                after_count=len(self.users))
1182
1183         # Delete one of the users
1184         del_index = len(self.users)//2
1185         del_user = self.users[del_index]
1186         self.ldb.delete(del_user['dn'])
1187
1188         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1189                                           after_count=len(self.users))
1190         expected_results = [r for r in full_results if r != del_user[attr]]
1191         self.assertEqual(results, expected_results)
1192
1193
1194 class PagedResultsTests(TestsWithUserOU):
1195
1196     def paged_search(self, expr, cookie="", page_size=0, extra_ctrls=None,
1197                      attrs=None, ou=None, subtree=False):
1198         ou = ou or self.ou
1199         if cookie:
1200             cookie = ":" + cookie
1201         ctrl = "paged_results:1:" + str(page_size) + cookie
1202         controls = [ctrl]
1203
1204         # If extra controls are provided then add them, else default to
1205         # sort control on 'cn' attribute
1206         if extra_ctrls is not None:
1207             controls += extra_ctrls
1208         else:
1209             sort_ctrl = "server_sort:1:0:cn"
1210             controls.append(sort_ctrl)
1211
1212         kwargs = {}
1213         if attrs is not None:
1214             kwargs = {"attrs": attrs}
1215
1216         scope = ldb.SCOPE_ONELEVEL
1217         if subtree:
1218             scope = ldb.SCOPE_SUBTREE
1219
1220         res = self.ldb.search(ou,
1221                               expression=expr,
1222                               scope=scope,
1223                               controls=controls,
1224                               **kwargs)
1225         results = [str(r['cn'][0]) for r in res]
1226
1227         ctrls = [str(c) for c in res.controls if
1228                  str(c).startswith("paged_results")]
1229         assert len(ctrls) == 1, "no paged_results response"
1230
1231         spl = ctrls[0].rsplit(':', 3)
1232         cookie = ""
1233         if len(spl) == 3:
1234             cookie = spl[-1]
1235         return results, cookie
1236
1237     def test_paged_delete_during_search(self):
1238         expr = "(objectClass=*)"
1239
1240         # Start new search
1241         first_page_size = 3
1242         results, cookie = self.paged_search(expr, page_size=first_page_size)
1243
1244         # Run normal search to get expected results
1245         unedited_results, _ = self.paged_search(expr,
1246                                                 page_size=len(self.users))
1247
1248         # Get remaining users not returned by the search above
1249         unreturned_users = [u for u in self.users if u['cn'] not in results]
1250
1251         # Delete one of the users
1252         del_index = len(self.users)//2
1253         del_user = unreturned_users[del_index]
1254         self.ldb.delete(del_user['dn'])
1255
1256         # Run test
1257         results, _ = self.paged_search(expr, cookie=cookie,
1258                                        page_size=len(self.users))
1259         expected_results = [r for r in unedited_results[first_page_size:]
1260                             if r != del_user['cn']]
1261         self.assertEqual(results, expected_results)
1262
1263     def test_paged_show_deleted(self):
1264         unique = time.strftime("%s", time.gmtime())[-5:]
1265         prefix = "show_deleted_test_%s_" % (unique)
1266         expr = "(&(objectClass=user)(cn=%s*))" % (prefix)
1267         del_ctrl = "show_deleted:1"
1268
1269         num_users = 10
1270         users = []
1271         for i in range(num_users):
1272             user = self.create_user(i, num_users, prefix=prefix)
1273             users.append(user)
1274
1275         first_user = users[0]
1276         self.ldb.delete(first_user['dn'])
1277
1278         # Start new search
1279         first_page_size = 3
1280         results, cookie = self.paged_search(expr, page_size=first_page_size,
1281                                             extra_ctrls=[del_ctrl],
1282                                             ou=self.base_dn,
1283                                             subtree=True)
1284
1285         # Get remaining users not returned by the search above
1286         unreturned_users = [u for u in users if u['cn'] not in results]
1287
1288         # Delete one of the users
1289         del_index = len(users)//2
1290         del_user = unreturned_users[del_index]
1291         self.ldb.delete(del_user['dn'])
1292
1293         results2, _ = self.paged_search(expr, cookie=cookie,
1294                                         page_size=len(users)*2,
1295                                         extra_ctrls=[del_ctrl],
1296                                         ou=self.base_dn,
1297                                         subtree=True)
1298
1299         user_cns = {str(u['cn']) for u in users}
1300         deleted_cns = {first_user['cn'], del_user['cn']}
1301
1302         all_results = results + results2
1303         normal_results = {r for r in all_results if "DEL:" not in r}
1304         self.assertEqual(normal_results, user_cns - deleted_cns)
1305
1306         # Deleted results get "\nDEL:<GUID>" added to the CN, so cut it out.
1307         deleted_results = {r[:r.index('\n')] for r in all_results
1308                            if "DEL:" in r}
1309         self.assertEqual(deleted_results, deleted_cns)
1310
1311     def test_paged_add_during_search(self):
1312         expr = "(objectClass=*)"
1313
1314         # Start new search
1315         first_page_size = 3
1316         results, cookie = self.paged_search(expr, page_size=first_page_size)
1317
1318         unedited_results, _ = self.paged_search(expr,
1319                                                 page_size=len(self.users)+1)
1320
1321         # Get remaining users not returned by the search above
1322         unwalked_users = [cn for cn in unedited_results if cn not in results]
1323
1324         # Add a user in the middle of the sort order
1325         middle_index = len(unwalked_users)//2
1326         middle_user = unwalked_users[middle_index]
1327
1328         user = {'cn': middle_user + '_2', "objectclass": "user"}
1329         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1330         self.ldb.add(user)
1331
1332         results, _ = self.paged_search(expr, cookie=cookie,
1333                                        page_size=len(self.users)+1)
1334         expected_results = unwalked_users[:]
1335
1336         # Uncomment this line to assert that adding worked.
1337         # expected_results.insert(middle_index+1, user['cn'])
1338
1339         self.assertEqual(results, expected_results)
1340
1341     def test_paged_modify_during_search(self):
1342         expr = "(objectClass=*)"
1343
1344         # Start new search
1345         first_page_size = 3
1346         results, cookie = self.paged_search(expr, page_size=first_page_size)
1347
1348         unedited_results, _ = self.paged_search(expr,
1349                                                 page_size=len(self.users)+1)
1350
1351         # Modify user in the middle of the remaining sort order
1352         unwalked_users = [cn for cn in unedited_results if cn not in results]
1353         middle_index = len(unwalked_users)//2
1354         middle_cn = unwalked_users[middle_index]
1355
1356         # Find user object
1357         users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1358         self.assertEqual(len(users_with_middle_cn), 1)
1359         middle_user = users_with_middle_cn[0]
1360
1361         # Rename object
1362         edit_cn = "z_" + middle_cn
1363         new_dn = middle_user['dn'].replace(middle_cn, edit_cn)
1364         self.ldb.rename(middle_user['dn'], new_dn)
1365
1366         results, _ = self.paged_search(expr, cookie=cookie,
1367                                        page_size=len(self.users)+1)
1368         expected_results = unwalked_users[:]
1369         expected_results[middle_index] = edit_cn
1370         self.assertEqual(results, expected_results)
1371
1372     def test_paged_modify_object_scope(self):
1373         expr = "(objectClass=*)"
1374
1375         ou2 = "OU=vlvtestou2,%s" % (self.tree_dn)
1376         self.ldb.add({"dn": ou2, "objectclass": "organizationalUnit"})
1377
1378         # Do a separate, full search to get all results
1379         unedited_results, _ = self.paged_search(expr,
1380                                                 page_size=len(self.users)+1)
1381
1382         # Rename before starting a search
1383         first_cn = self.users[0]['cn']
1384         new_dn = "CN=%s,%s" % (first_cn, ou2)
1385         self.ldb.rename(self.users[0]['dn'], new_dn)
1386
1387         # Start new search under the original OU
1388         first_page_size = 3
1389         results, cookie = self.paged_search(expr, page_size=first_page_size)
1390         self.assertEqual(results, unedited_results[1:1+first_page_size])
1391
1392         # Get one of the users that is yet to be returned
1393         unwalked_users = [cn for cn in unedited_results if cn not in results]
1394         middle_index = len(unwalked_users)//2
1395         middle_cn = unwalked_users[middle_index]
1396
1397         # Find user object
1398         users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1399         self.assertEqual(len(users_with_middle_cn), 1)
1400         middle_user = users_with_middle_cn[0]
1401
1402         # Rename
1403         new_dn = "CN=%s,%s" % (middle_cn, ou2)
1404         self.ldb.rename(middle_user['dn'], new_dn)
1405
1406         results, _ = self.paged_search(expr, cookie=cookie,
1407                                        page_size=len(self.users)+1)
1408
1409         expected_results = unwalked_users[:]
1410
1411         # We should really expect that the object renamed into a different
1412         # OU should vanish from the results, but turns out Windows does return
1413         # the object in this case.  Our module matches the Windows behaviour.
1414
1415         # If behaviour changes, this line inverts the test's expectations to
1416         # what you might expect.
1417         # del expected_results[middle_index]
1418
1419         # But still expect the user we removed before the search to be gone
1420         del expected_results[0]
1421
1422         self.assertEqual(results, expected_results)
1423
1424     def assertPagedSearchRaises(self, err_num, expr, cookie, attrs=None,
1425                                 extra_ctrls=None):
1426         try:
1427             results, _ = self.paged_search(expr, cookie=cookie,
1428                                            page_size=2,
1429                                            extra_ctrls=extra_ctrls,
1430                                            attrs=attrs)
1431         except ldb.LdbError as e:
1432             self.assertEqual(e.args[0], err_num)
1433             return
1434
1435         self.fail("No error raised by invalid search")
1436
1437     def test_paged_changed_expr(self):
1438         # Initiate search then use a different expr in subsequent req
1439         expr = "(objectClass=*)"
1440         results, cookie = self.paged_search(expr, page_size=3)
1441         expr = "cn>=a"
1442         expected_error_num = 12
1443         self.assertPagedSearchRaises(expected_error_num, expr, cookie)
1444
1445     def test_paged_changed_controls(self):
1446         expr = "(objectClass=*)"
1447         sort_ctrl = "server_sort:1:0:cn"
1448         del_ctrl = "show_deleted:1"
1449         expected_error_num = 12
1450         ps = 3
1451
1452         # Initiate search with a sort control then remove in subsequent req
1453         results, cookie = self.paged_search(expr, page_size=ps,
1454                                             extra_ctrls=[sort_ctrl])
1455         self.assertPagedSearchRaises(expected_error_num, expr,
1456                                      cookie, extra_ctrls=[])
1457
1458         # Initiate search with no sort control then add one in subsequent req
1459         results, cookie = self.paged_search(expr, page_size=ps,
1460                                             extra_ctrls=[])
1461         self.assertPagedSearchRaises(expected_error_num, expr,
1462                                      cookie, extra_ctrls=[sort_ctrl])
1463
1464         # Initiate search with show-deleted control then
1465         # remove it in subsequent req
1466         results, cookie = self.paged_search(expr, page_size=ps,
1467                                             extra_ctrls=[del_ctrl])
1468         self.assertPagedSearchRaises(expected_error_num, expr,
1469                                      cookie, extra_ctrls=[])
1470
1471         # Initiate normal search then add show-deleted control
1472         # in subsequent req
1473         results, cookie = self.paged_search(expr, page_size=ps,
1474                                             extra_ctrls=[])
1475         self.assertPagedSearchRaises(expected_error_num, expr,
1476                                      cookie, extra_ctrls=[del_ctrl])
1477
1478         # Changing order of controls shouldn't break the search
1479         results, cookie = self.paged_search(expr, page_size=ps,
1480                                             extra_ctrls=[del_ctrl, sort_ctrl])
1481         try:
1482             results, cookie = self.paged_search(expr, page_size=ps,
1483                                                 extra_ctrls=[sort_ctrl,
1484                                                              del_ctrl])
1485         except ldb.LdbError as e:
1486             self.fail(e)
1487
1488     def test_paged_cant_change_controls_data(self):
1489         # Some defaults for the rest of the tests
1490         expr = "(objectClass=*)"
1491         sort_ctrl = "server_sort:1:0:cn"
1492         expected_error_num = 12
1493
1494         # Initiate search with sort control then change it in subsequent req
1495         results, cookie = self.paged_search(expr, page_size=3,
1496                                             extra_ctrls=[sort_ctrl])
1497         changed_sort_ctrl = "server_sort:1:0:roomNumber"
1498         self.assertPagedSearchRaises(expected_error_num, expr,
1499                                      cookie, extra_ctrls=[changed_sort_ctrl])
1500
1501         # Initiate search with a control with crit=1, then use crit=0
1502         results, cookie = self.paged_search(expr, page_size=3,
1503                                             extra_ctrls=[sort_ctrl])
1504         changed_sort_ctrl = "server_sort:0:0:cn"
1505         self.assertPagedSearchRaises(expected_error_num, expr,
1506                                      cookie, extra_ctrls=[changed_sort_ctrl])
1507
1508     def test_paged_search_referrals(self):
1509         expr = "(objectClass=*)"
1510         paged_ctrl = "paged_results:1:5"
1511         res = self.ldb.search(self.base_dn,
1512                               expression=expr,
1513                               attrs=['cn'],
1514                               scope=ldb.SCOPE_SUBTREE,
1515                               controls=[paged_ctrl])
1516
1517         # Do a paged search walk over the whole database and save a list
1518         # of all the referrals returned by each search.
1519         referral_lists = []
1520
1521         while True:
1522             referral_lists.append(res.referals)
1523
1524             ctrls = [str(c) for c in res.controls if
1525                      str(c).startswith("paged_results")]
1526             self.assertEqual(len(ctrls), 1)
1527             spl = ctrls[0].rsplit(':')
1528             if len(spl) != 3:
1529                 break
1530
1531             cookie = spl[-1]
1532             res = self.ldb.search(self.base_dn,
1533                                   expression=expr,
1534                                   attrs=['cn'],
1535                                   scope=ldb.SCOPE_SUBTREE,
1536                                   controls=[paged_ctrl + ":" + cookie])
1537
1538         ref_list = referral_lists[0]
1539
1540         # Sanity check to make sure the search actually did something
1541         self.assertGreater(len(referral_lists), 2)
1542
1543         # Check the first referral set contains stuff
1544         self.assertGreater(len(ref_list), 0)
1545
1546         # Check the others don't
1547         self.assertTrue(all([len(l) == 0 for l in referral_lists[1:]]))
1548
1549         # Check the entries in the first referral list look like referrals
1550         self.assertTrue(all([s.startswith('ldap://') for s in ref_list]))
1551
1552     def test_paged_change_attrs(self):
1553         expr = "(objectClass=*)"
1554         attrs = ['cn']
1555         expected_error_num = 12
1556
1557         results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
1558         results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
1559                                             attrs=attrs)
1560
1561         changed_attrs = attrs + ['roomNumber']
1562         self.assertPagedSearchRaises(expected_error_num, expr,
1563                                      cookie, attrs=changed_attrs,
1564                                      extra_ctrls=[])
1565
1566     def test_paged_search_lockstep(self):
1567         expr = "(objectClass=*)"
1568         ps = 3
1569
1570         all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
1571
1572         # Run two different but overlapping paged searches simultaneously.
1573         set_1_index = int((len(all_results))//3)
1574         set_2_index = int((2*len(all_results))//3)
1575         set_1 = all_results[set_1_index:]
1576         set_2 = all_results[:set_2_index+1]
1577         set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
1578         set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
1579
1580         results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
1581         self.assertEqual(results, set_1[:ps])
1582         results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
1583         self.assertEqual(results, set_2[:ps])
1584
1585         results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
1586                                              page_size=ps)
1587         self.assertEqual(results, set_1[ps:ps*2])
1588         results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
1589                                              page_size=ps)
1590         self.assertEqual(results, set_2[ps:ps*2])
1591
1592         results, _ = self.paged_search(set_1_expr, cookie=cookie1,
1593                                        page_size=len(self.users))
1594         self.assertEqual(results, set_1[ps*2:])
1595         results, _ = self.paged_search(set_2_expr, cookie=cookie2,
1596                                        page_size=len(self.users))
1597         self.assertEqual(results, set_2[ps*2:])
1598
1599
1600 if "://" not in host:
1601     if os.path.isfile(host):
1602         host = "tdb://%s" % host
1603     else:
1604         host = "ldap://%s" % host
1605
1606
1607 TestProgram(module=__name__, opts=subunitopts)