Fix PEP8 warning E225 missing whitespace around operator
[vlendec/samba-autobuild/.git] / source4 / torture / drs / python / getncchanges.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Catalyst.Net Ltd. 2017
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 from __future__ import print_function
31 import drs_base
32 import samba.tests
33 import ldb
34 from ldb import SCOPE_BASE
35 import random
36
37 from samba.dcerpc import drsuapi
38
39 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
40     def setUp(self):
41         super(DrsReplicaSyncIntegrityTestCase, self).setUp()
42
43         self.init_test_state()
44
45         # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
46         # the vampire_dc), so we point this test directly at that DC
47         self.set_test_ldb_dc(self.ldb_dc2)
48
49         self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
50                                                  "getncchanges"))
51         self.base_dn = self.test_ldb_dc.get_default_basedn()
52
53         self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
54         self.set_dc_connection(self.default_conn)
55
56     def tearDown(self):
57         super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
58         # tidyup groups and users
59         try:
60             self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
61         except ldb.LdbError as e:
62             (enum, string) = e.args
63             if enum == ldb.ERR_NO_SUCH_OBJECT:
64                 pass
65
66     def init_test_state(self):
67         self.rxd_dn_list = []
68         self.rxd_links = []
69         self.rxd_guids = []
70         self.last_ctr = None
71
72         # 100 is the minimum max_objects that Microsoft seems to honour
73         # (the max honoured is 400ish), so we use that in these tests
74         self.max_objects = 100
75
76         # store whether we used GET_TGT/GET_ANC flags in the requests
77         self.used_get_tgt = False
78         self.used_get_anc = False
79
80     def add_object(self, dn, objectclass="organizationalunit"):
81         """Adds an OU object"""
82         self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
83         res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
84         self.assertEquals(len(res), 1)
85
86     def modify_object(self, dn, attr, value):
87         """Modifies an object's USN by adding an attribute value to it"""
88         m = ldb.Message()
89         m.dn = ldb.Dn(self.test_ldb_dc, dn)
90         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
91         self.test_ldb_dc.modify(m)
92
93     def delete_attribute(self, dn, attr, value):
94         """Deletes an attribute from an object"""
95         m = ldb.Message()
96         m.dn = ldb.Dn(self.test_ldb_dc, dn)
97         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
98         self.test_ldb_dc.modify(m)
99
100     def start_new_repl_cycle(self):
101         """Resets enough state info to start a new replication cycle"""
102         # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
103         # whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve
104         self.rxd_links = []
105
106         self.used_get_tgt = False
107         self.used_get_anc = False
108         # mostly preserve self.last_ctr, so that we use the last HWM
109         if self.last_ctr is not None:
110             self.last_ctr.more_data = True
111
112     def create_object_range(self, start, end, prefix="",
113                             children=None, parent_list=None):
114         """
115         Creates a block of objects. Object names are numbered sequentially,
116         using the optional prefix supplied. If the children parameter is
117         supplied it will create a parent-child hierarchy and return the
118         top-level parents separately.
119         """
120         dn_list = []
121
122         # Use dummy/empty lists if we're not creating a parent/child hierarchy
123         if children is None:
124             children = []
125
126         if parent_list is None:
127             parent_list = []
128
129         # Create the parents first, then the children.
130         # This makes it easier to see in debug when GET_ANC takes effect
131         # because the parent/children become interleaved (by default,
132         # this approach means the objects are organized into blocks of
133         # parents and blocks of children together)
134         for x in range(start, end):
135             ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
136             self.add_object(ou)
137             dn_list.append(ou)
138
139             # keep track of the top-level parents (if needed)
140             parent_list.append(ou)
141
142         # create the block of children (if needed)
143         for x in range(start, end):
144             for child in children:
145                 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
146                 self.add_object(ou)
147                 dn_list.append(ou)
148
149         return dn_list
150
151     def assert_expected_data(self, expected_list):
152         """
153         Asserts that we received all the DNs that we expected and
154         none are missing.
155         """
156         received_list = self.rxd_dn_list
157
158         # Note that with GET_ANC Windows can end up sending the same parent
159         # object multiple times, so this might be noteworthy but doesn't
160         # warrant failing the test
161         if (len(received_list) != len(expected_list)):
162             print("Note: received %d objects but expected %d" % (len(received_list),
163                                                                  len(expected_list)))
164
165         # Check that we received every object that we were expecting
166         for dn in expected_list:
167             self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
168
169     def test_repl_integrity(self):
170         """
171         Modify the objects being replicated while the replication is still
172         in progress and check that no object loss occurs.
173         """
174
175         # The server behaviour differs between samba and Windows. Samba returns
176         # the objects in the original order (up to the pre-modify HWM). Windows
177         # incorporates the modified objects and returns them in the new order
178         # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
179         # docs state the Windows behaviour is optional.
180
181         # Create a range of objects to replicate.
182         expected_dn_list = self.create_object_range(0, 400)
183         (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
184
185         # We ask for the first page of 100 objects.
186         # For this test, we don't care what order we receive the objects in,
187         # so long as by the end we've received everything
188         self.repl_get_next()
189
190         # Modify some of the second page of objects. This should bump the highwatermark
191         for x in range(100, 200):
192             self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
193
194         (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
195         self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
196
197         # Get the remaining blocks of data
198         while not self.replication_complete():
199             self.repl_get_next()
200
201         # Check we still receive all the objects we're expecting
202         self.assert_expected_data(expected_dn_list)
203
204     def is_parent_known(self, dn, known_dn_list):
205         """
206         Returns True if the parent of the dn specified is in known_dn_list
207         """
208
209         # we can sometimes get system objects like the RID Manager returned.
210         # Ignore anything that is not under the test OU we created
211         if self.ou not in dn:
212             return True
213
214         # Remove the child portion from the name to get the parent's DN
215         name_substrings = dn.split(",")
216         del name_substrings[0]
217
218         parent_dn = ",".join(name_substrings)
219
220         # check either this object is a parent (it's parent is the top-level
221         # test object), or its parent has been seen previously
222         return parent_dn == self.ou or parent_dn in known_dn_list
223
224     def _repl_send_request(self, get_anc=False, get_tgt=False):
225         """Sends a GetNCChanges request for the next block of replication data."""
226
227         # we're just trying to mimic regular client behaviour here, so just
228         # use the highwatermark in the last response we received
229         if self.last_ctr:
230             highwatermark = self.last_ctr.new_highwatermark
231             uptodateness_vector = self.last_ctr.uptodateness_vector
232         else:
233             # this is the first replication chunk
234             highwatermark = None
235             uptodateness_vector = None
236
237         # Ask for the next block of replication data
238         replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
239         more_flags = 0
240
241         if get_anc:
242             replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC
243             self.used_get_anc = True
244
245         if get_tgt:
246             more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
247             self.used_get_tgt = True
248
249         # return the response from the DC
250         return self._get_replication(replica_flags,
251                                      max_objects=self.max_objects,
252                                      highwatermark=highwatermark,
253                                      uptodateness_vector=uptodateness_vector,
254                                      more_flags=more_flags)
255
256     def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
257         """
258         Requests the next block of replication data. This tries to simulate
259         client behaviour - if we receive a replicated object that we don't know
260         the parent of, then re-request the block with the GET_ANC flag set.
261         If we don't know the target object for a linked attribute, then
262         re-request with GET_TGT.
263         """
264
265         # send a request to the DC and get the response
266         ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
267
268         # extract the object DNs and their GUIDs from the response
269         rxd_dn_list = self._get_ctr6_dn_list(ctr6)
270         rxd_guid_list = self._get_ctr6_object_guids(ctr6)
271
272         # we'll add new objects as we discover them, so take a copy of the
273         # ones we already know about, so we can modify these lists safely
274         known_objects = self.rxd_dn_list[:]
275         known_guids = self.rxd_guids[:]
276
277         # check that we know the parent for every object received
278         for i in range(0, len(rxd_dn_list)):
279
280             dn = rxd_dn_list[i]
281             guid = rxd_guid_list[i]
282
283             if self.is_parent_known(dn, known_objects):
284
285                 # the new DN is now known so add it to the list.
286                 # It may be the parent of another child in this block
287                 known_objects.append(dn)
288                 known_guids.append(guid)
289             else:
290                 # If we've already set the GET_ANC flag then it should mean
291                 # we receive the parents before the child
292                 self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
293
294                 print("Unknown parent for %s - try GET_ANC" % dn)
295
296                 # try the same thing again with the GET_ANC flag set this time
297                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
298                                           assert_links=assert_links)
299
300         # check we know about references to any objects in the linked attritbutes
301         received_links = self._get_ctr6_links(ctr6)
302
303         # This is so that older versions of Samba fail - we want the links to be
304         # sent roughly with the objects, rather than getting all links at the end
305         if assert_links:
306             self.assertTrue(len(received_links) > 0,
307                             "Links were expected in the GetNCChanges response")
308
309         for link in received_links:
310
311             # skip any links that aren't part of the test
312             if self.ou not in link.targetDN:
313                 continue
314
315             # check the source object is known (Windows can actually send links
316             # where we don't know the source object yet). Samba shouldn't ever
317             # hit this case because it gets the links based on the source
318             if link.identifier not in known_guids:
319
320                 # If we've already set the GET_ANC flag then it should mean
321                 # this case doesn't happen
322                 self.assertFalse(get_anc, "Unknown source object for GUID %s"
323                                  % link.identifier)
324
325                 print("Unknown source GUID %s - try GET_ANC" % link.identifier)
326
327                 # try the same thing again with the GET_ANC flag set this time
328                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
329                                           assert_links=assert_links)
330
331             # check we know the target object
332             if link.targetGUID not in known_guids:
333
334                 # If we've already set the GET_TGT flag then we should have
335                 # already received any objects we need to know about
336                 self.assertFalse(get_tgt, "Unknown linked target for object %s"
337                                  % link.targetDN)
338
339                 print("Unknown target for %s - try GET_TGT" % link.targetDN)
340
341                 # try the same thing again with the GET_TGT flag set this time
342                 return self.repl_get_next(get_anc=get_anc, get_tgt=True,
343                                           assert_links=assert_links)
344
345         # store the last successful result so we know what HWM to request next
346         self.last_ctr = ctr6
347
348         # store the objects, GUIDs, and links we received
349         self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
350         self.rxd_links += self._get_ctr6_links(ctr6)
351         self.rxd_guids += self._get_ctr6_object_guids(ctr6)
352
353         return ctr6
354
355     def replication_complete(self):
356         """Returns True if the current/last replication cycle is complete"""
357
358         if self.last_ctr is None or self.last_ctr.more_data:
359             return False
360         else:
361             return True
362
363     def test_repl_integrity_get_anc(self):
364         """
365         Modify the parent objects being replicated while the replication is still
366         in progress (using GET_ANC) and check that no object loss occurs.
367         """
368
369         # Note that GET_ANC behaviour varies between Windows and Samba.
370         # On Samba GET_ANC results in the replication restarting from the very
371         # beginning. After that, Samba remembers GET_ANC and also sends the
372         # parents in subsequent requests (regardless of whether GET_ANC is
373         # specified in the later request).
374         # Windows only sends the parents if GET_ANC was specified in the last
375         # request. It will also resend a parent, even if it's already sent the
376         # parent in a previous response (whereas Samba doesn't).
377
378         # Create a small block of 50 parents, each with 2 children (A and B)
379         # This is so that we receive some children in the first block, so we
380         # can resend with GET_ANC before we learn too many parents
381         parent_dn_list = []
382         expected_dn_list = self.create_object_range(0, 50, prefix="parent",
383                                                     children=("A", "B"),
384                                                     parent_list=parent_dn_list)
385
386         # create the remaining parents and children
387         expected_dn_list += self.create_object_range(50, 150, prefix="parent",
388                                                      children=("A", "B"),
389                                                      parent_list=parent_dn_list)
390
391         # We've now got objects in the following order:
392         # [50 parents][100 children][100 parents][200 children]
393
394         # Modify the first parent so that it's now ordered last by USN
395         # This means we set the GET_ANC flag pretty much straight away
396         # because we receive the first child before the first parent
397         self.modify_object(parent_dn_list[0], "displayName", "OU0")
398
399         # modify a later block of parents so they also get reordered
400         for x in range(50, 100):
401             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
402
403         # Get the first block of objects - this should resend the request with
404         # GET_ANC set because we won't know about the first child's parent.
405         # On samba GET_ANC essentially starts the sync from scratch again, so
406         # we get this over with early before we learn too many parents
407         self.repl_get_next()
408
409         # modify the last chunk of parents. They should now have a USN higher
410         # than the highwater-mark for the replication cycle
411         for x in range(100, 150):
412             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
413
414         # Get the remaining blocks of data - this will resend the request with
415         # GET_ANC if it encounters an object it doesn't have the parent for.
416         while not self.replication_complete():
417             self.repl_get_next()
418
419         # The way the test objects have been created should force
420         # self.repl_get_next() to use the GET_ANC flag. If this doesn't
421         # actually happen, then the test isn't doing its job properly
422         self.assertTrue(self.used_get_anc,
423                         "Test didn't use the GET_ANC flag as expected")
424
425         # Check we get all the objects we're expecting
426         self.assert_expected_data(expected_dn_list)
427
428     def assert_expected_links(self, objects_with_links, link_attr="managedBy",
429                               num_expected=None):
430         """
431         Asserts that a GetNCChanges response contains any expected links
432         for the objects it contains.
433         """
434         received_links = self.rxd_links
435
436         if num_expected is None:
437             num_expected = len(objects_with_links)
438
439         self.assertTrue(len(received_links) == num_expected,
440                         "Received %d links but expected %d"
441                         % (len(received_links), num_expected))
442
443         for dn in objects_with_links:
444             self.assert_object_has_link(dn, link_attr, received_links)
445
446     def assert_object_has_link(self, dn, link_attr, received_links):
447         """
448         Queries the object in the DB and asserts there is a link in the
449         GetNCChanges response that matches.
450         """
451
452         # Look up the link attribute in the DB
453         # The extended_dn option will dump the GUID info for the link
454         # attribute (as a hex blob)
455         res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr],
456                                       controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE)
457
458         # We didn't find the expected link attribute in the DB for the object.
459         # Something has gone wrong somewhere...
460         self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s"
461                         % (dn, link_attr))
462
463         # find the received link in the list and assert that the target and
464         # source GUIDs match what's in the DB
465         for val in res[0][link_attr]:
466             # Work out the expected source and target GUIDs for the DB link
467             target_dn = ldb.Dn(self.test_ldb_dc, val)
468             targetGUID_blob = target_dn.get_extended_component("GUID")
469             sourceGUID_blob = res[0].dn.get_extended_component("GUID")
470
471             found = False
472
473             for link in received_links:
474                 if link.selfGUID_blob == sourceGUID_blob and \
475                    link.targetGUID_blob == targetGUID_blob:
476
477                     found = True
478
479                     if self._debug:
480                         print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
481                     break
482
483             self.assertTrue(found, "Did not receive expected link for DN %s" % dn)
484
485     def test_repl_get_tgt(self):
486         """
487         Creates a scenario where we should receive the linked attribute before
488         we know about the target object, and therefore need to use GET_TGT.
489         Note: Samba currently avoids this problem by sending all its links last
490         """
491
492         # create the test objects
493         reportees = self.create_object_range(0, 100, prefix="reportee")
494         managers = self.create_object_range(0, 100, prefix="manager")
495         all_objects = managers + reportees
496         expected_links = reportees
497
498         # add a link attribute to each reportee object that points to the
499         # corresponding manager object as the target
500         for i in range(0, 100):
501             self.modify_object(reportees[i], "managedBy", managers[i])
502
503         # touch the managers (the link-target objects) again to make sure the
504         # reportees (link source objects) get returned first by the replication
505         for i in range(0, 100):
506             self.modify_object(managers[i], "displayName", "OU%d" % i)
507
508         links_expected = True
509
510         # Get all the replication data - this code should resend the requests
511         # with GET_TGT
512         while not self.replication_complete():
513
514             # get the next block of replication data (this sets GET_TGT if needed)
515             self.repl_get_next(assert_links=links_expected)
516             links_expected = len(self.rxd_links) < len(expected_links)
517
518         # The way the test objects have been created should force
519         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
520         # actually happen, then the test isn't doing its job properly
521         self.assertTrue(self.used_get_tgt,
522                         "Test didn't use the GET_TGT flag as expected")
523
524         # Check we get all the objects we're expecting
525         self.assert_expected_data(all_objects)
526
527         # Check we received links for all the reportees
528         self.assert_expected_links(expected_links)
529
530     def test_repl_get_tgt_chain(self):
531         """
532         Tests the behaviour of GET_TGT with a more complicated scenario.
533         Here we create a chain of objects linked together, so if we follow
534         the link target, then we'd traverse ~200 objects each time.
535         """
536
537         # create the test objects
538         objectsA = self.create_object_range(0, 100, prefix="AAA")
539         objectsB = self.create_object_range(0, 100, prefix="BBB")
540         objectsC = self.create_object_range(0, 100, prefix="CCC")
541
542         # create a complex set of object links:
543         #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
544         # Basically each object-A should link to a circular chain of 200 B/C
545         # objects. We create the links in separate chunks here, as it makes it
546         # clearer what happens with the USN (links on Windows have their own
547         # USN, so this approach means the A->B/B->C links aren't interleaved)
548         for i in range(0, 100):
549             self.modify_object(objectsA[i], "managedBy", objectsB[i])
550
551         for i in range(0, 100):
552             self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100])
553
554         for i in range(0, 100):
555             self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100])
556
557         all_objects = objectsA + objectsB + objectsC
558         expected_links = all_objects
559
560         # the default order the objects now get returned in should be:
561         # [A0-A99][B0-B99][C0-C99]
562
563         links_expected = True
564
565         # Get all the replication data - this code should resend the requests
566         # with GET_TGT
567         while not self.replication_complete():
568
569             # get the next block of replication data (this sets GET_TGT if needed)
570             self.repl_get_next(assert_links=links_expected)
571             links_expected = len(self.rxd_links) < len(expected_links)
572
573         # The way the test objects have been created should force
574         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
575         # actually happen, then the test isn't doing its job properly
576         self.assertTrue(self.used_get_tgt,
577                         "Test didn't use the GET_TGT flag as expected")
578
579         # Check we get all the objects we're expecting
580         self.assert_expected_data(all_objects)
581
582         # Check we received links for all the reportees
583         self.assert_expected_links(expected_links)
584
585     def test_repl_integrity_link_attr(self):
586         """
587         Tests adding links to new objects while a replication is in progress.
588         """
589
590         # create some source objects for the linked attributes, sandwiched
591         # between 2 blocks of filler objects
592         filler = self.create_object_range(0, 100, prefix="filler")
593         reportees = self.create_object_range(0, 100, prefix="reportee")
594         filler += self.create_object_range(100, 200, prefix="filler")
595
596         # Start the replication and get the first block of filler objects
597         # (We're being mean here and setting the GET_TGT flag right from the
598         # start. On earlier Samba versions, if the client encountered an
599         # unknown target object and retried with GET_TGT, it would restart the
600         # replication cycle from scratch, which avoids the problem).
601         self.repl_get_next(get_tgt=True)
602
603         # create the target objects and add the links. These objects should be
604         # outside the scope of the Samba replication cycle, but the links should
605         # still get sent with the source object
606         managers = self.create_object_range(0, 100, prefix="manager")
607
608         for i in range(0, 100):
609             self.modify_object(reportees[i], "managedBy", managers[i])
610
611         expected_objects = managers + reportees + filler
612         expected_links = reportees
613
614         # complete the replication
615         while not self.replication_complete():
616             self.repl_get_next(get_tgt=True)
617
618         # If we didn't receive the most recently created objects in the last
619         # replication cycle, then kick off another replication to get them
620         if len(self.rxd_dn_list) < len(expected_objects):
621             self.repl_get_next()
622
623             while not self.replication_complete():
624                 self.repl_get_next()
625
626         # Check we get all the objects we're expecting
627         self.assert_expected_data(expected_objects)
628
629         # Check we received links for all the parents
630         self.assert_expected_links(expected_links)
631
632     def test_repl_get_anc_link_attr(self):
633         """
634         A basic GET_ANC test where the parents have linked attributes
635         """
636
637         # Create a block of 100 parents and 100 children
638         parent_dn_list = []
639         expected_dn_list = self.create_object_range(0, 100, prefix="parent",
640                                                     children=("A"),
641                                                     parent_list=parent_dn_list)
642
643         # Add links from the parents to the children
644         for x in range(0, 100):
645             self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100])
646
647         # add some filler objects at the end. This allows us to easily see
648         # which chunk the links get sent in
649         expected_dn_list += self.create_object_range(0, 100, prefix="filler")
650
651         # We've now got objects in the following order:
652         # [100 x children][100 x parents][100 x filler]
653
654         # Get the replication data - because the block of children come first,
655         # this should retry the request with GET_ANC
656         while not self.replication_complete():
657             self.repl_get_next()
658
659         self.assertTrue(self.used_get_anc,
660                         "Test didn't use the GET_ANC flag as expected")
661
662         # Check we get all the objects we're expecting
663         self.assert_expected_data(expected_dn_list)
664
665         # Check we received links for all the parents
666         self.assert_expected_links(parent_dn_list)
667
668     def test_repl_get_tgt_and_anc(self):
669         """
670         Check we can resolve an unknown ancestor when fetching the link target,
671         i.e. tests using GET_TGT and GET_ANC in combination
672         """
673
674         # Create some parent/child objects (the child will be the link target)
675         parents = []
676         all_objects = self.create_object_range(0, 100, prefix="parent",
677                                                children=["la_tgt"],
678                                                parent_list=parents)
679
680         children = [item for item in all_objects if item not in parents]
681
682         # create the link source objects and link them to the child/target
683         la_sources = self.create_object_range(0, 100, prefix="la_src")
684         all_objects += la_sources
685
686         for i in range(0, 100):
687             self.modify_object(la_sources[i], "managedBy", children[i])
688
689         expected_links = la_sources
690
691         # modify the children/targets so they come after the link source
692         for x in range(0, 100):
693             self.modify_object(children[x], "displayName", "OU%d" % x)
694
695         # modify the parents, so they now come last in the replication
696         for x in range(0, 100):
697             self.modify_object(parents[x], "displayName", "OU%d" % x)
698
699         # We've now got objects in the following order:
700         # [100 la_source][100 la_target][100 parents (of la_target)]
701
702         links_expected = True
703
704         # Get all the replication data - this code should resend the requests
705         # with GET_TGT and GET_ANC
706         while not self.replication_complete():
707
708             # get the next block of replication data (this sets GET_TGT/GET_ANC)
709             self.repl_get_next(assert_links=links_expected)
710             links_expected = len(self.rxd_links) < len(expected_links)
711
712         # The way the test objects have been created should force
713         # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
714         # doesn't actually happen, then the test isn't doing its job properly
715         self.assertTrue(self.used_get_tgt,
716                         "Test didn't use the GET_TGT flag as expected")
717         self.assertTrue(self.used_get_anc,
718                         "Test didn't use the GET_ANC flag as expected")
719
720         # Check we get all the objects we're expecting
721         self.assert_expected_data(all_objects)
722
723         # Check we received links for all the link sources
724         self.assert_expected_links(expected_links)
725
726         # Second part of test. Add some extra objects and kick off another
727         # replication. The test code will use the HWM from the last replication
728         # so we'll only receive the objects we modify below
729         self.start_new_repl_cycle()
730
731         # add an extra level of grandchildren that hang off a child
732         # that got created last time
733         new_parent = "OU=test_new_parent,%s" % children[0]
734         self.add_object(new_parent)
735         new_children = []
736
737         for x in range(0, 50):
738             dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
739             self.add_object(dn)
740             new_children.append(dn)
741
742         # replace half of the links to point to the new children
743         for x in range(0, 50):
744             self.delete_attribute(la_sources[x], "managedBy", children[x])
745             self.modify_object(la_sources[x], "managedBy", new_children[x])
746
747         # add some filler objects to fill up the 1st chunk
748         filler = self.create_object_range(0, 100, prefix="filler")
749
750         # modify the new children/targets so they come after the link source
751         for x in range(0, 50):
752             self.modify_object(new_children[x], "displayName", "OU-%d" % x)
753
754         # modify the parent, so it now comes last in the replication
755         self.modify_object(new_parent, "displayName", "OU%d" % x)
756
757         # We should now get the modified objects in the following order:
758         # [50 links (x 2)][100 filler][50 new children][new parent]
759         # Note that the link sources aren't actually sent (their new linked
760         # attributes are sent, but apart from that, nothing has changed)
761         all_objects = filler + new_children + [new_parent]
762         expected_links = la_sources[:50]
763
764         links_expected = True
765
766         while not self.replication_complete():
767             self.repl_get_next(assert_links=links_expected)
768             links_expected = len(self.rxd_links) < len(expected_links)
769
770         self.assertTrue(self.used_get_tgt,
771                         "Test didn't use the GET_TGT flag as expected")
772         self.assertTrue(self.used_get_anc,
773                         "Test didn't use the GET_ANC flag as expected")
774
775         # Check we get all the objects we're expecting
776         self.assert_expected_data(all_objects)
777
778         # Check we received links (50 deleted links and 50 new)
779         self.assert_expected_links(expected_links, num_expected=100)
780
781     def _repl_integrity_obj_deletion(self, delete_link_source=True):
782         """
783         Tests deleting link objects while a replication is in progress.
784         """
785
786         # create some objects and link them together, with some filler
787         # object in between the link sources
788         la_sources = self.create_object_range(0, 100, prefix="la_source")
789         la_targets = self.create_object_range(0, 100, prefix="la_targets")
790
791         for i in range(0, 50):
792             self.modify_object(la_sources[i], "managedBy", la_targets[i])
793
794         filler = self.create_object_range(0, 100, prefix="filler")
795
796         for i in range(50, 100):
797             self.modify_object(la_sources[i], "managedBy", la_targets[i])
798
799         # touch the targets so that the sources get replicated first
800         for i in range(0, 100):
801             self.modify_object(la_targets[i], "displayName", "OU%d" % i)
802
803         # objects should now be in the following USN order:
804         # [50 la_source][100 filler][50 la_source][100 la_target]
805
806         # Get the first block containing 50 link sources
807         self.repl_get_next()
808
809         # delete either the link targets or link source objects
810         if delete_link_source:
811             objects_to_delete = la_sources
812             # in GET_TGT testenvs we only receive the first 50 source objects
813             expected_objects = la_sources[:50] + la_targets + filler
814         else:
815             objects_to_delete = la_targets
816             expected_objects = la_sources + filler
817
818         for obj in objects_to_delete:
819             self.ldb_dc2.delete(obj)
820
821         # complete the replication
822         while not self.replication_complete():
823             self.repl_get_next()
824
825         # Check we get all the objects we're expecting
826         self.assert_expected_data(expected_objects)
827
828         # we can't use assert_expected_links() here because it tries to check
829         # against the deleted objects on the DC. (Although we receive some
830         # links from the first block processed, the Samba client should end up
831         # deleting these, as the source/target object involved is deleted)
832         self.assertTrue(len(self.rxd_links) == 50,
833                         "Expected 50 links, not %d" % len(self.rxd_links))
834
835     def test_repl_integrity_src_obj_deletion(self):
836         self._repl_integrity_obj_deletion(delete_link_source=True)
837
838     def test_repl_integrity_tgt_obj_deletion(self):
839         self._repl_integrity_obj_deletion(delete_link_source=False)
840
841     def restore_deleted_object(self, guid, new_dn):
842         """Re-animates a deleted object"""
843
844         res = self.test_ldb_dc.search(base="<GUID=%s>" % self._GUID_string(guid), attrs=["isDeleted"],
845                                   controls=['show_deleted:1'], scope=ldb.SCOPE_BASE)
846         if len(res) != 1:
847             return
848
849         msg = ldb.Message()
850         msg.dn = res[0].dn
851         msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
852         msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
853         self.test_ldb_dc.modify(msg, ["show_deleted:1"])
854
855     def sync_DCs(self, nc_dn=None):
856         # make sure DC1 has all the changes we've made to DC2
857         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn)
858
859     def get_object_guid(self, dn):
860         res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE)
861         return res[0]['objectGUID'][0]
862
863
864     def set_dc_connection(self, conn):
865         """
866         Switches over the connection state info that the underlying drs_base
867         class uses so that we replicate with a different DC.
868         """
869         self.default_hwm = conn.default_hwm
870         self.default_utdv = conn.default_utdv
871         self.drs = conn.drs
872         self.drs_handle = conn.drs_handle
873         self.set_test_ldb_dc(conn.ldb_dc)
874
875     def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
876                                              expected_links):
877         """
878         Replicates against both the primary and secondary DCs in the testenv
879         and checks that both return the expected results.
880         """
881         print("Checking replication against primary test DC...")
882
883         # get the replication data from the test DC first
884         while not self.replication_complete():
885             self.repl_get_next()
886
887         # Check we get all the objects and links we're expecting
888         self.assert_expected_data(all_objects)
889         self.assert_expected_links(expected_links)
890
891         # switch over the DC state info so we now talk to the peer DC
892         self.set_dc_connection(peer_conn)
893         self.init_test_state()
894
895         print("Checking replication against secondary test DC...")
896
897         # check that we get the same information from the 2nd DC
898         while not self.replication_complete():
899             self.repl_get_next()
900
901         self.assert_expected_data(all_objects)
902         self.assert_expected_links(expected_links)
903
904         # switch back to using the default connection
905         self.set_dc_connection(self.default_conn)
906
907     def test_repl_integrity_obj_reanimation(self):
908         """
909         Checks receiving links for a re-animated object doesn't lose links.
910         We test this against the peer DC to make sure it doesn't drop links.
911         """
912
913         # This test is a little different in that we're particularly interested
914         # in exercising the replmd client code on the second DC.
915         # First, make sure the peer DC has the base OU, then connect to it (so
916         # we store its inital HWM)
917         self.sync_DCs()
918         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
919
920         # create the link source/target objects
921         la_sources = self.create_object_range(0, 100, prefix="la_src")
922         la_targets = self.create_object_range(0, 100, prefix="la_tgt")
923
924         # store the target object's GUIDs (we need to know these to reanimate them)
925         target_guids = []
926
927         for dn in la_targets:
928             target_guids.append(self.get_object_guid(dn))
929
930         # delete the link target
931         for x in range(0, 100):
932             self.ldb_dc2.delete(la_targets[x])
933
934         # sync the DCs, then disable replication. We want the peer DC to get
935         # all the following changes in a single replication cycle
936         self.sync_DCs()
937         self._disable_all_repl(self.dnsname_dc2)
938
939         # restore the target objects for the linked attributes again
940         for x in range(0, 100):
941             self.restore_deleted_object(target_guids[x], la_targets[x])
942
943         # add the links
944         for x in range(0, 100):
945             self.modify_object(la_sources[x], "managedBy", la_targets[x])
946
947         # create some additional filler objects
948         filler = self.create_object_range(0, 100, prefix="filler")
949
950         # modify the targets so they now come last
951         for x in range(0, 100):
952             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
953
954         # the objects should now be sent in the following order:
955         # [la sources + links][filler][la targets]
956         all_objects = la_sources + la_targets + filler
957         expected_links = la_sources
958
959         # Enable replication again make sure the 2 DCs are back in sync
960         self._enable_all_repl(self.dnsname_dc2)
961         self.sync_DCs()
962
963         # Get the replication data from each DC in turn.
964         # Check that both give us all the objects and links we're expecting,
965         # i.e. no links were lost
966         self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
967                                                   expected_links)
968
969     def test_repl_integrity_cross_partition_links(self):
970         """
971         Checks that a cross-partition link to an unknown target object does
972         not result in missing links.
973         """
974
975         # check the peer DC is up-to-date, then connect (storing its HWM)
976         self.sync_DCs()
977         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
978
979         # stop replication so the peer gets the following objects in one go
980         self._disable_all_repl(self.dnsname_dc2)
981
982         # create a link source object in the main NC
983         la_source = "OU=cross_nc_src,%s" % self.ou
984         self.add_object(la_source)
985
986         # create the link target (a server object) in the config NC
987         rand = random.randint(1, 10000000)
988         la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \
989                     "CN=Sites,%s" % (rand, self.config_dn)
990         self.add_object(la_target, objectclass="server")
991
992         # add a cross-partition link between the two
993         self.modify_object(la_source, "managedBy", la_target)
994
995         # First, sync across to the peer the NC containing the link source object
996         self.sync_DCs()
997
998         # Now, before the peer has received the partition containing the target
999         # object, try replicating from the peer. It will only know about half
1000         # of the link at this point, but it should be a valid scenario
1001         self.set_dc_connection(peer_conn)
1002
1003         while not self.replication_complete():
1004             # pretend we've received other link targets out of order and that's
1005             # forced us to use GET_TGT. This checks the peer doesn't fail trying
1006             # to fetch a cross-partition target object that doesn't exist
1007             self.repl_get_next(get_tgt=True)
1008
1009         self.set_dc_connection(self.default_conn)
1010         self.init_test_state()
1011
1012         # Now sync across the partition containing the link target object
1013         self.sync_DCs(nc_dn=self.config_dn)
1014         self._enable_all_repl(self.dnsname_dc2)
1015
1016         # Get the replication data from each DC in turn.
1017         # Check that both return the cross-partition link (note we're not
1018         # checking the config domain NC here for simplicity)
1019         self.assert_DCs_replication_is_consistent(peer_conn,
1020                                                   all_objects=[la_source],
1021                                                   expected_links=[la_source])
1022
1023         # the cross-partition linked attribute has a missing backlink. Check
1024         # that we can still delete it successfully
1025         self.delete_attribute(la_source, "managedBy", la_target)
1026         self.sync_DCs()
1027
1028         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1029                                       attrs=["managedBy"],
1030                                       controls=['extended_dn:1:0'],
1031                                       scope=ldb.SCOPE_BASE)
1032         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1033                          % la_source)
1034         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1035                                       attrs=["managedBy"],
1036                                       controls=['extended_dn:1:0'],
1037                                       scope=ldb.SCOPE_BASE)
1038         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1039                          % la_source)
1040
1041         # Check receiving a cross-partition link to a deleted target.
1042         # Delete the target and make sure the deletion is sync'd between DCs
1043         target_guid = self.get_object_guid(la_target)
1044         self.test_ldb_dc.delete(la_target)
1045         self.sync_DCs(nc_dn=self.config_dn)        
1046         self._disable_all_repl(self.dnsname_dc2)
1047
1048         # re-animate the target
1049         self.restore_deleted_object(target_guid, la_target)
1050         self.modify_object(la_source, "managedBy", la_target)
1051
1052         # now sync the link - because the target is in another partition, the
1053         # peer DC receives a link for a deleted target, which it should accept
1054         self.sync_DCs()
1055         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1056                                       attrs=["managedBy"],
1057                                       controls=['extended_dn:1:0'],
1058                                       scope=ldb.SCOPE_BASE)
1059         self.assertTrue("managedBy" in res[0], "%s in DB missing managedBy attribute"
1060                         % la_source)
1061
1062         # cleanup the server object we created in the Configuration partition
1063         self.test_ldb_dc.delete(la_target)
1064         self._enable_all_repl(self.dnsname_dc2)
1065
1066     def test_repl_get_tgt_multivalued_links(self):
1067         """Tests replication with multi-valued link attributes."""
1068
1069         # create the target/source objects and link them together
1070         la_targets = self.create_object_range(0, 500, prefix="la_tgt")
1071         la_source = "CN=la_src,%s" % self.ou
1072         self.add_object(la_source, objectclass="msExchConfigurationContainer")
1073
1074         for tgt in la_targets:
1075             self.modify_object(la_source, "addressBookRoots2", tgt)
1076
1077         filler = self.create_object_range(0, 100, prefix="filler")
1078
1079         # We should receive the objects/links in the following order:
1080         # [500 targets + 1 source][500 links][100 filler]
1081         expected_objects = la_targets + [la_source] + filler
1082         link_only_chunk = False
1083
1084         # First do the replication without needing GET_TGT
1085         while not self.replication_complete():
1086             ctr6 = self.repl_get_next()
1087
1088             if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
1089                 link_only_chunk = True
1090
1091         # we should receive one chunk that contains only links
1092         self.assertTrue(link_only_chunk,
1093                         "Expected to receive a chunk containing only links")
1094
1095         # check we received all the expected objects/links
1096         self.assert_expected_data(expected_objects)
1097         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1098
1099         # Do the replication again, forcing the use of GET_TGT this time
1100         self.init_test_state()
1101
1102         for x in range(0, 500):
1103             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
1104
1105         # The objects/links should get sent in the following order:
1106         # [1 source][500 targets][500 links][100 filler]
1107
1108         while not self.replication_complete():
1109             ctr6 = self.repl_get_next()
1110
1111         self.assertTrue(self.used_get_tgt,
1112                         "Test didn't use the GET_TGT flag as expected")
1113
1114         # check we received all the expected objects/links
1115         self.assert_expected_data(expected_objects)
1116         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1117
1118
1119 class DcConnection:
1120     """Helper class to track a connection to another DC"""
1121
1122     def __init__(self, drs_base, ldb_dc, dnsname_dc):
1123         self.ldb_dc = ldb_dc
1124         (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1125         (self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1126
1127