2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Catalyst.Net Ltd. 2017
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
28 # getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
31 from __future__ import print_function
35 from ldb import SCOPE_BASE
38 from samba.dcerpc import drsuapi
41 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
43 super(DrsReplicaSyncIntegrityTestCase, self).setUp()
45 self.init_test_state()
47 # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
48 # the vampire_dc), so we point this test directly at that DC
49 self.set_test_ldb_dc(self.ldb_dc2)
51 self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
53 self.base_dn = self.test_ldb_dc.get_default_basedn()
55 self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
56 self.set_dc_connection(self.default_conn)
59 super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
60 # tidyup groups and users
62 self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
63 except ldb.LdbError as e:
64 (enum, string) = e.args
65 if enum == ldb.ERR_NO_SUCH_OBJECT:
68 def init_test_state(self):
74 # 100 is the minimum max_objects that Microsoft seems to honour
75 # (the max honoured is 400ish), so we use that in these tests
76 self.max_objects = 100
78 # store whether we used GET_TGT/GET_ANC flags in the requests
79 self.used_get_tgt = False
80 self.used_get_anc = False
82 def add_object(self, dn, objectclass="organizationalunit"):
83 """Adds an OU object"""
84 self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
85 res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
86 self.assertEquals(len(res), 1)
88 def modify_object(self, dn, attr, value):
89 """Modifies an object's USN by adding an attribute value to it"""
91 m.dn = ldb.Dn(self.test_ldb_dc, dn)
92 m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
93 self.test_ldb_dc.modify(m)
95 def delete_attribute(self, dn, attr, value):
96 """Deletes an attribute from an object"""
98 m.dn = ldb.Dn(self.test_ldb_dc, dn)
99 m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
100 self.test_ldb_dc.modify(m)
102 def start_new_repl_cycle(self):
103 """Resets enough state info to start a new replication cycle"""
104 # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
105 # whether a parent/target is unknown and needs GET_ANC/GET_TGT to
109 self.used_get_tgt = False
110 self.used_get_anc = False
111 # mostly preserve self.last_ctr, so that we use the last HWM
112 if self.last_ctr is not None:
113 self.last_ctr.more_data = True
115 def create_object_range(self, start, end, prefix="",
116 children=None, parent_list=None):
118 Creates a block of objects. Object names are numbered sequentially,
119 using the optional prefix supplied. If the children parameter is
120 supplied it will create a parent-child hierarchy and return the
121 top-level parents separately.
125 # Use dummy/empty lists if we're not creating a parent/child hierarchy
129 if parent_list is None:
132 # Create the parents first, then the children.
133 # This makes it easier to see in debug when GET_ANC takes effect
134 # because the parent/children become interleaved (by default,
135 # this approach means the objects are organized into blocks of
136 # parents and blocks of children together)
137 for x in range(start, end):
138 ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
142 # keep track of the top-level parents (if needed)
143 parent_list.append(ou)
145 # create the block of children (if needed)
146 for x in range(start, end):
147 for child in children:
148 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
154 def assert_expected_data(self, expected_list):
156 Asserts that we received all the DNs that we expected and
159 received_list = self.rxd_dn_list
161 # Note that with GET_ANC Windows can end up sending the same parent
162 # object multiple times, so this might be noteworthy but doesn't
163 # warrant failing the test
164 num_received = len(received_list)
165 num_expected = len(expected_list)
166 if num_received != num_expected:
167 print("Note: received %d objects but expected %d" % (num_received,
170 # Check that we received every object that we were expecting
171 for dn in expected_list:
172 self.assertTrue(dn in received_list,
173 "DN '%s' missing from replication." % dn)
175 def test_repl_integrity(self):
177 Modify the objects being replicated while the replication is still
178 in progress and check that no object loss occurs.
181 # The server behaviour differs between samba and Windows. Samba returns
182 # the objects in the original order (up to the pre-modify HWM). Windows
183 # incorporates the modified objects and returns them in the new order
184 # (i.e. modified objects last), up to the post-modify HWM. The
185 # Microsoft docs state the Windows behaviour is optional.
187 # Create a range of objects to replicate.
188 expected_dn_list = self.create_object_range(0, 400)
189 (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
191 # We ask for the first page of 100 objects.
192 # For this test, we don't care what order we receive the objects in,
193 # so long as by the end we've received everything
196 # Modify some of the second page of objects. This should bump the
198 for x in range(100, 200):
199 self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
201 (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
202 self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
204 # Get the remaining blocks of data
205 while not self.replication_complete():
208 # Check we still receive all the objects we're expecting
209 self.assert_expected_data(expected_dn_list)
211 def is_parent_known(self, dn, known_dn_list):
213 Returns True if the parent of the dn specified is in known_dn_list
216 # we can sometimes get system objects like the RID Manager returned.
217 # Ignore anything that is not under the test OU we created
218 if self.ou not in dn:
221 # Remove the child portion from the name to get the parent's DN
222 name_substrings = dn.split(",")
223 del name_substrings[0]
225 parent_dn = ",".join(name_substrings)
227 # check either this object is a parent (it's parent is the top-level
228 # test object), or its parent has been seen previously
229 return parent_dn == self.ou or parent_dn in known_dn_list
231 def _repl_send_request(self, get_anc=False, get_tgt=False):
233 Sends a GetNCChanges request for the next block of replication data.
236 # we're just trying to mimic regular client behaviour here, so just
237 # use the highwatermark in the last response we received
239 highwatermark = self.last_ctr.new_highwatermark
240 uptodateness_vector = self.last_ctr.uptodateness_vector
242 # this is the first replication chunk
244 uptodateness_vector = None
246 # Ask for the next block of replication data
247 replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
251 replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC
252 self.used_get_anc = True
255 more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
256 self.used_get_tgt = True
258 # return the response from the DC
259 return self._get_replication(replica_flags,
260 max_objects=self.max_objects,
261 highwatermark=highwatermark,
262 uptodateness_vector=uptodateness_vector,
264 more_flags=more_flags)
266 def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
268 Requests the next block of replication data. This tries to simulate
269 client behaviour - if we receive a replicated object that we don't know
270 the parent of, then re-request the block with the GET_ANC flag set.
271 If we don't know the target object for a linked attribute, then
272 re-request with GET_TGT.
275 # send a request to the DC and get the response
276 ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
278 # extract the object DNs and their GUIDs from the response
279 rxd_dn_list = self._get_ctr6_dn_list(ctr6)
280 rxd_guid_list = self._get_ctr6_object_guids(ctr6)
282 # we'll add new objects as we discover them, so take a copy of the
283 # ones we already know about, so we can modify these lists safely
284 known_objects = self.rxd_dn_list[:]
285 known_guids = self.rxd_guids[:]
287 # check that we know the parent for every object received
288 for i in range(0, len(rxd_dn_list)):
291 guid = rxd_guid_list[i]
293 if self.is_parent_known(dn, known_objects):
295 # the new DN is now known so add it to the list.
296 # It may be the parent of another child in this block
297 known_objects.append(dn)
298 known_guids.append(guid)
300 # If we've already set the GET_ANC flag then it should mean
301 # we receive the parents before the child
302 self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
304 print("Unknown parent for %s - try GET_ANC" % dn)
306 # try the same thing again with the GET_ANC flag set this time
307 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
308 assert_links=assert_links)
310 # check we know about references to any objects in the linked attrs
311 received_links = self._get_ctr6_links(ctr6)
313 # This is so that older versions of Samba fail - we want the links to
314 # be sent roughly with the objects, rather than getting all links at
317 self.assertTrue(len(received_links) > 0,
318 "Links were expected in the GetNCChanges response")
320 for link in received_links:
322 # skip any links that aren't part of the test
323 if self.ou not in link.targetDN:
326 # check the source object is known (Windows can actually send links
327 # where we don't know the source object yet). Samba shouldn't ever
328 # hit this case because it gets the links based on the source
329 if link.identifier not in known_guids:
331 # If we've already set the GET_ANC flag then it should mean
332 # this case doesn't happen
333 self.assertFalse(get_anc, "Unknown source object for GUID %s"
336 print("Unknown source GUID %s - try GET_ANC" % link.identifier)
338 # try the same thing again with the GET_ANC flag set this time
339 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
340 assert_links=assert_links)
342 # check we know the target object
343 if link.targetGUID not in known_guids:
345 # If we've already set the GET_TGT flag then we should have
346 # already received any objects we need to know about
347 self.assertFalse(get_tgt, "Unknown linked target for object %s"
350 print("Unknown target for %s - try GET_TGT" % link.targetDN)
352 # try the same thing again with the GET_TGT flag set this time
353 return self.repl_get_next(get_anc=get_anc, get_tgt=True,
354 assert_links=assert_links)
356 # store the last successful result so we know what HWM to request next
359 # store the objects, GUIDs, and links we received
360 self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
361 self.rxd_links += self._get_ctr6_links(ctr6)
362 self.rxd_guids += self._get_ctr6_object_guids(ctr6)
366 def replication_complete(self):
367 """Returns True if the current/last replication cycle is complete"""
369 if self.last_ctr is None or self.last_ctr.more_data:
374 def test_repl_integrity_get_anc(self):
376 Modify the parent objects being replicated while the replication is
377 still in progress (using GET_ANC) and check that no object loss occurs.
380 # Note that GET_ANC behaviour varies between Windows and Samba.
381 # On Samba GET_ANC results in the replication restarting from the very
382 # beginning. After that, Samba remembers GET_ANC and also sends the
383 # parents in subsequent requests (regardless of whether GET_ANC is
384 # specified in the later request).
385 # Windows only sends the parents if GET_ANC was specified in the last
386 # request. It will also resend a parent, even if it's already sent the
387 # parent in a previous response (whereas Samba doesn't).
389 # Create a small block of 50 parents, each with 2 children (A and B)
390 # This is so that we receive some children in the first block, so we
391 # can resend with GET_ANC before we learn too many parents
393 expected_dn_list = self.create_object_range(0, 50, prefix="parent",
395 parent_list=parent_dn_list)
397 # create the remaining parents and children
398 expected_dn_list += self.create_object_range(50, 150, prefix="parent",
400 parent_list=parent_dn_list)
402 # We've now got objects in the following order:
403 # [50 parents][100 children][100 parents][200 children]
405 # Modify the first parent so that it's now ordered last by USN
406 # This means we set the GET_ANC flag pretty much straight away
407 # because we receive the first child before the first parent
408 self.modify_object(parent_dn_list[0], "displayName", "OU0")
410 # modify a later block of parents so they also get reordered
411 for x in range(50, 100):
412 self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
414 # Get the first block of objects - this should resend the request with
415 # GET_ANC set because we won't know about the first child's parent.
416 # On samba GET_ANC essentially starts the sync from scratch again, so
417 # we get this over with early before we learn too many parents
420 # modify the last chunk of parents. They should now have a USN higher
421 # than the highwater-mark for the replication cycle
422 for x in range(100, 150):
423 self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
425 # Get the remaining blocks of data - this will resend the request with
426 # GET_ANC if it encounters an object it doesn't have the parent for.
427 while not self.replication_complete():
430 # The way the test objects have been created should force
431 # self.repl_get_next() to use the GET_ANC flag. If this doesn't
432 # actually happen, then the test isn't doing its job properly
433 self.assertTrue(self.used_get_anc,
434 "Test didn't use the GET_ANC flag as expected")
436 # Check we get all the objects we're expecting
437 self.assert_expected_data(expected_dn_list)
439 def assert_expected_links(self, objects_with_links, link_attr="managedBy",
442 Asserts that a GetNCChanges response contains any expected links
443 for the objects it contains.
445 received_links = self.rxd_links
447 if num_expected is None:
448 num_expected = len(objects_with_links)
450 self.assertTrue(len(received_links) == num_expected,
451 "Received %d links but expected %d"
452 % (len(received_links), num_expected))
454 for dn in objects_with_links:
455 self.assert_object_has_link(dn, link_attr, received_links)
457 def assert_object_has_link(self, dn, link_attr, received_links):
459 Queries the object in the DB and asserts there is a link in the
460 GetNCChanges response that matches.
463 # Look up the link attribute in the DB
464 # The extended_dn option will dump the GUID info for the link
465 # attribute (as a hex blob)
466 res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn),
468 controls=['extended_dn:1:0'],
469 scope=ldb.SCOPE_BASE)
471 # We didn't find the expected link attribute in the DB for the object.
472 # Something has gone wrong somewhere...
473 self.assertTrue(link_attr in res[0],
474 "%s in DB doesn't have attribute %s" % (dn, link_attr))
476 # find the received link in the list and assert that the target and
477 # source GUIDs match what's in the DB
478 for val in [str(val) for val in res[0][link_attr]]:
479 # Work out the expected source and target GUIDs for the DB link
480 target_dn = ldb.Dn(self.test_ldb_dc, val)
481 targetGUID_blob = target_dn.get_extended_component("GUID")
482 sourceGUID_blob = res[0].dn.get_extended_component("GUID")
486 for link in received_links:
487 if link.selfGUID_blob == sourceGUID_blob and \
488 link.targetGUID_blob == targetGUID_blob:
493 print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
496 self.assertTrue(found,
497 "Did not receive expected link for DN %s" % dn)
499 def test_repl_get_tgt(self):
501 Creates a scenario where we should receive the linked attribute before
502 we know about the target object, and therefore need to use GET_TGT.
503 Note: Samba currently avoids this problem by sending all its links last
506 # create the test objects
507 reportees = self.create_object_range(0, 100, prefix="reportee")
508 managers = self.create_object_range(0, 100, prefix="manager")
509 all_objects = managers + reportees
510 expected_links = reportees
512 # add a link attribute to each reportee object that points to the
513 # corresponding manager object as the target
514 for i in range(0, 100):
515 self.modify_object(reportees[i], "managedBy", managers[i])
517 # touch the managers (the link-target objects) again to make sure the
518 # reportees (link source objects) get returned first by the replication
519 for i in range(0, 100):
520 self.modify_object(managers[i], "displayName", "OU%d" % i)
522 links_expected = True
524 # Get all the replication data - this code should resend the requests
526 while not self.replication_complete():
528 # get the next block of replication data (this sets GET_TGT
530 self.repl_get_next(assert_links=links_expected)
531 links_expected = len(self.rxd_links) < len(expected_links)
533 # The way the test objects have been created should force
534 # self.repl_get_next() to use the GET_TGT flag. If this doesn't
535 # actually happen, then the test isn't doing its job properly
536 self.assertTrue(self.used_get_tgt,
537 "Test didn't use the GET_TGT flag as expected")
539 # Check we get all the objects we're expecting
540 self.assert_expected_data(all_objects)
542 # Check we received links for all the reportees
543 self.assert_expected_links(expected_links)
545 def test_repl_get_tgt_chain(self):
547 Tests the behaviour of GET_TGT with a more complicated scenario.
548 Here we create a chain of objects linked together, so if we follow
549 the link target, then we'd traverse ~200 objects each time.
552 # create the test objects
553 objectsA = self.create_object_range(0, 100, prefix="AAA")
554 objectsB = self.create_object_range(0, 100, prefix="BBB")
555 objectsC = self.create_object_range(0, 100, prefix="CCC")
557 # create a complex set of object links:
558 # A0-->B0-->C1-->B2-->C3-->B4-->and so on...
559 # Basically each object-A should link to a circular chain of 200 B/C
560 # objects. We create the links in separate chunks here, as it makes it
561 # clearer what happens with the USN (links on Windows have their own
562 # USN, so this approach means the A->B/B->C links aren't interleaved)
563 for i in range(0, 100):
564 self.modify_object(objectsA[i], "managedBy", objectsB[i])
566 for i in range(0, 100):
567 self.modify_object(objectsB[i], "managedBy",
568 objectsC[(i + 1) % 100])
570 for i in range(0, 100):
571 self.modify_object(objectsC[i], "managedBy",
572 objectsB[(i + 1) % 100])
574 all_objects = objectsA + objectsB + objectsC
575 expected_links = all_objects
577 # the default order the objects now get returned in should be:
578 # [A0-A99][B0-B99][C0-C99]
580 links_expected = True
582 # Get all the replication data - this code should resend the requests
584 while not self.replication_complete():
586 # get the next block of replication data (this sets GET_TGT
588 self.repl_get_next(assert_links=links_expected)
589 links_expected = len(self.rxd_links) < len(expected_links)
591 # The way the test objects have been created should force
592 # self.repl_get_next() to use the GET_TGT flag. If this doesn't
593 # actually happen, then the test isn't doing its job properly
594 self.assertTrue(self.used_get_tgt,
595 "Test didn't use the GET_TGT flag as expected")
597 # Check we get all the objects we're expecting
598 self.assert_expected_data(all_objects)
600 # Check we received links for all the reportees
601 self.assert_expected_links(expected_links)
603 def test_repl_integrity_link_attr(self):
605 Tests adding links to new objects while a replication is in progress.
608 # create some source objects for the linked attributes, sandwiched
609 # between 2 blocks of filler objects
610 filler = self.create_object_range(0, 100, prefix="filler")
611 reportees = self.create_object_range(0, 100, prefix="reportee")
612 filler += self.create_object_range(100, 200, prefix="filler")
614 # Start the replication and get the first block of filler objects
615 # (We're being mean here and setting the GET_TGT flag right from the
616 # start. On earlier Samba versions, if the client encountered an
617 # unknown target object and retried with GET_TGT, it would restart the
618 # replication cycle from scratch, which avoids the problem).
619 self.repl_get_next(get_tgt=True)
621 # create the target objects and add the links. These objects should be
622 # outside the scope of the Samba replication cycle, but the links
623 # should still get sent with the source object
624 managers = self.create_object_range(0, 100, prefix="manager")
626 for i in range(0, 100):
627 self.modify_object(reportees[i], "managedBy", managers[i])
629 expected_objects = managers + reportees + filler
630 expected_links = reportees
632 # complete the replication
633 while not self.replication_complete():
634 self.repl_get_next(get_tgt=True)
636 # If we didn't receive the most recently created objects in the last
637 # replication cycle, then kick off another replication to get them
638 if len(self.rxd_dn_list) < len(expected_objects):
641 while not self.replication_complete():
644 # Check we get all the objects we're expecting
645 self.assert_expected_data(expected_objects)
647 # Check we received links for all the parents
648 self.assert_expected_links(expected_links)
650 def test_repl_get_anc_link_attr(self):
652 A basic GET_ANC test where the parents have linked attributes
655 # Create a block of 100 parents and 100 children
657 expected_dn_list = self.create_object_range(0, 100, prefix="parent",
659 parent_list=parent_dn_list)
661 # Add links from the parents to the children
662 for x in range(0, 100):
663 self.modify_object(parent_dn_list[x], "managedBy",
664 expected_dn_list[x + 100])
666 # add some filler objects at the end. This allows us to easily see
667 # which chunk the links get sent in
668 expected_dn_list += self.create_object_range(0, 100, prefix="filler")
670 # We've now got objects in the following order:
671 # [100 x children][100 x parents][100 x filler]
673 # Get the replication data - because the block of children come first,
674 # this should retry the request with GET_ANC
675 while not self.replication_complete():
678 self.assertTrue(self.used_get_anc,
679 "Test didn't use the GET_ANC flag as expected")
681 # Check we get all the objects we're expecting
682 self.assert_expected_data(expected_dn_list)
684 # Check we received links for all the parents
685 self.assert_expected_links(parent_dn_list)
687 def test_repl_get_tgt_and_anc(self):
689 Check we can resolve an unknown ancestor when fetching the link target,
690 i.e. tests using GET_TGT and GET_ANC in combination
693 # Create some parent/child objects (the child will be the link target)
695 all_objects = self.create_object_range(0, 100, prefix="parent",
699 children = [item for item in all_objects if item not in parents]
701 # create the link source objects and link them to the child/target
702 la_sources = self.create_object_range(0, 100, prefix="la_src")
703 all_objects += la_sources
705 for i in range(0, 100):
706 self.modify_object(la_sources[i], "managedBy", children[i])
708 expected_links = la_sources
710 # modify the children/targets so they come after the link source
711 for x in range(0, 100):
712 self.modify_object(children[x], "displayName", "OU%d" % x)
714 # modify the parents, so they now come last in the replication
715 for x in range(0, 100):
716 self.modify_object(parents[x], "displayName", "OU%d" % x)
718 # We've now got objects in the following order:
719 # [100 la_source][100 la_target][100 parents (of la_target)]
721 links_expected = True
723 # Get all the replication data - this code should resend the requests
724 # with GET_TGT and GET_ANC
725 while not self.replication_complete():
727 # get the next block of replication data (this sets
729 self.repl_get_next(assert_links=links_expected)
730 links_expected = len(self.rxd_links) < len(expected_links)
732 # The way the test objects have been created should force
733 # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
734 # doesn't actually happen, then the test isn't doing its job properly
735 self.assertTrue(self.used_get_tgt,
736 "Test didn't use the GET_TGT flag as expected")
737 self.assertTrue(self.used_get_anc,
738 "Test didn't use the GET_ANC flag as expected")
740 # Check we get all the objects we're expecting
741 self.assert_expected_data(all_objects)
743 # Check we received links for all the link sources
744 self.assert_expected_links(expected_links)
746 # Second part of test. Add some extra objects and kick off another
747 # replication. The test code will use the HWM from the last replication
748 # so we'll only receive the objects we modify below
749 self.start_new_repl_cycle()
751 # add an extra level of grandchildren that hang off a child
752 # that got created last time
753 new_parent = "OU=test_new_parent,%s" % children[0]
754 self.add_object(new_parent)
757 for x in range(0, 50):
758 dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
760 new_children.append(dn)
762 # replace half of the links to point to the new children
763 for x in range(0, 50):
764 self.delete_attribute(la_sources[x], "managedBy", children[x])
765 self.modify_object(la_sources[x], "managedBy", new_children[x])
767 # add some filler objects to fill up the 1st chunk
768 filler = self.create_object_range(0, 100, prefix="filler")
770 # modify the new children/targets so they come after the link source
771 for x in range(0, 50):
772 self.modify_object(new_children[x], "displayName", "OU-%d" % x)
774 # modify the parent, so it now comes last in the replication
775 self.modify_object(new_parent, "displayName", "OU%d" % x)
777 # We should now get the modified objects in the following order:
778 # [50 links (x 2)][100 filler][50 new children][new parent]
779 # Note that the link sources aren't actually sent (their new linked
780 # attributes are sent, but apart from that, nothing has changed)
781 all_objects = filler + new_children + [new_parent]
782 expected_links = la_sources[:50]
784 links_expected = True
786 while not self.replication_complete():
787 self.repl_get_next(assert_links=links_expected)
788 links_expected = len(self.rxd_links) < len(expected_links)
790 self.assertTrue(self.used_get_tgt,
791 "Test didn't use the GET_TGT flag as expected")
792 self.assertTrue(self.used_get_anc,
793 "Test didn't use the GET_ANC flag as expected")
795 # Check we get all the objects we're expecting
796 self.assert_expected_data(all_objects)
798 # Check we received links (50 deleted links and 50 new)
799 self.assert_expected_links(expected_links, num_expected=100)
801 def _repl_integrity_obj_deletion(self, delete_link_source=True):
803 Tests deleting link objects while a replication is in progress.
806 # create some objects and link them together, with some filler
807 # object in between the link sources
808 la_sources = self.create_object_range(0, 100, prefix="la_source")
809 la_targets = self.create_object_range(0, 100, prefix="la_targets")
811 for i in range(0, 50):
812 self.modify_object(la_sources[i], "managedBy", la_targets[i])
814 filler = self.create_object_range(0, 100, prefix="filler")
816 for i in range(50, 100):
817 self.modify_object(la_sources[i], "managedBy", la_targets[i])
819 # touch the targets so that the sources get replicated first
820 for i in range(0, 100):
821 self.modify_object(la_targets[i], "displayName", "OU%d" % i)
823 # objects should now be in the following USN order:
824 # [50 la_source][100 filler][50 la_source][100 la_target]
826 # Get the first block containing 50 link sources
829 # delete either the link targets or link source objects
830 if delete_link_source:
831 objects_to_delete = la_sources
832 # in GET_TGT testenvs we only receive the first 50 source objects
833 expected_objects = la_sources[:50] + la_targets + filler
835 objects_to_delete = la_targets
836 expected_objects = la_sources + filler
838 for obj in objects_to_delete:
839 self.ldb_dc2.delete(obj)
841 # complete the replication
842 while not self.replication_complete():
845 # Check we get all the objects we're expecting
846 self.assert_expected_data(expected_objects)
848 # we can't use assert_expected_links() here because it tries to check
849 # against the deleted objects on the DC. (Although we receive some
850 # links from the first block processed, the Samba client should end up
851 # deleting these, as the source/target object involved is deleted)
852 self.assertTrue(len(self.rxd_links) == 50,
853 "Expected 50 links, not %d" % len(self.rxd_links))
855 def test_repl_integrity_src_obj_deletion(self):
856 self._repl_integrity_obj_deletion(delete_link_source=True)
858 def test_repl_integrity_tgt_obj_deletion(self):
859 self._repl_integrity_obj_deletion(delete_link_source=False)
861 def restore_deleted_object(self, guid, new_dn):
862 """Re-animates a deleted object"""
864 guid_str = self._GUID_string(guid)
865 res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str,
867 controls=['show_deleted:1'],
868 scope=ldb.SCOPE_BASE)
874 msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
876 msg["distinguishedName"] = ldb.MessageElement([new_dn],
877 ldb.FLAG_MOD_REPLACE,
879 self.test_ldb_dc.modify(msg, ["show_deleted:1"])
881 def sync_DCs(self, nc_dn=None):
882 # make sure DC1 has all the changes we've made to DC2
883 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2,
886 def get_object_guid(self, dn):
887 res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"],
888 scope=ldb.SCOPE_BASE)
889 return res[0]['objectGUID'][0]
891 def set_dc_connection(self, conn):
893 Switches over the connection state info that the underlying drs_base
894 class uses so that we replicate with a different DC.
896 self.default_hwm = conn.default_hwm
897 self.default_utdv = conn.default_utdv
899 self.drs_handle = conn.drs_handle
900 self.set_test_ldb_dc(conn.ldb_dc)
902 def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
905 Replicates against both the primary and secondary DCs in the testenv
906 and checks that both return the expected results.
908 print("Checking replication against primary test DC...")
910 # get the replication data from the test DC first
911 while not self.replication_complete():
914 # Check we get all the objects and links we're expecting
915 self.assert_expected_data(all_objects)
916 self.assert_expected_links(expected_links)
918 # switch over the DC state info so we now talk to the peer DC
919 self.set_dc_connection(peer_conn)
920 self.init_test_state()
922 print("Checking replication against secondary test DC...")
924 # check that we get the same information from the 2nd DC
925 while not self.replication_complete():
928 self.assert_expected_data(all_objects)
929 self.assert_expected_links(expected_links)
931 # switch back to using the default connection
932 self.set_dc_connection(self.default_conn)
934 def test_repl_integrity_obj_reanimation(self):
936 Checks receiving links for a re-animated object doesn't lose links.
937 We test this against the peer DC to make sure it doesn't drop links.
940 # This test is a little different in that we're particularly interested
941 # in exercising the replmd client code on the second DC.
942 # First, make sure the peer DC has the base OU, then connect to it (so
943 # we store its initial HWM)
945 peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
947 # create the link source/target objects
948 la_sources = self.create_object_range(0, 100, prefix="la_src")
949 la_targets = self.create_object_range(0, 100, prefix="la_tgt")
951 # store the target object's GUIDs (we need to know these to
955 for dn in la_targets:
956 target_guids.append(self.get_object_guid(dn))
958 # delete the link target
959 for x in range(0, 100):
960 self.ldb_dc2.delete(la_targets[x])
962 # sync the DCs, then disable replication. We want the peer DC to get
963 # all the following changes in a single replication cycle
965 self._disable_all_repl(self.dnsname_dc2)
967 # restore the target objects for the linked attributes again
968 for x in range(0, 100):
969 self.restore_deleted_object(target_guids[x], la_targets[x])
972 for x in range(0, 100):
973 self.modify_object(la_sources[x], "managedBy", la_targets[x])
975 # create some additional filler objects
976 filler = self.create_object_range(0, 100, prefix="filler")
978 # modify the targets so they now come last
979 for x in range(0, 100):
980 self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
982 # the objects should now be sent in the following order:
983 # [la sources + links][filler][la targets]
984 all_objects = la_sources + la_targets + filler
985 expected_links = la_sources
987 # Enable replication again make sure the 2 DCs are back in sync
988 self._enable_all_repl(self.dnsname_dc2)
991 # Get the replication data from each DC in turn.
992 # Check that both give us all the objects and links we're expecting,
993 # i.e. no links were lost
994 self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
997 def test_repl_integrity_cross_partition_links(self):
999 Checks that a cross-partition link to an unknown target object does
1000 not result in missing links.
1003 # check the peer DC is up-to-date, then connect (storing its HWM)
1005 peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
1007 # stop replication so the peer gets the following objects in one go
1008 self._disable_all_repl(self.dnsname_dc2)
1010 # create a link source object in the main NC
1011 la_source = "OU=cross_nc_src,%s" % self.ou
1012 self.add_object(la_source)
1014 # create the link target (a server object) in the config NC
1015 sites_dn = "CN=Sites,%s" % self.config_dn
1016 servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn
1017 rand = random.randint(1, 10000000)
1018 la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn)
1019 self.add_object(la_target, objectclass="server")
1021 # add a cross-partition link between the two
1022 self.modify_object(la_source, "managedBy", la_target)
1024 # First, sync to the peer the NC containing the link source object
1027 # Now, before the peer has received the partition containing the target
1028 # object, try replicating from the peer. It will only know about half
1029 # of the link at this point, but it should be a valid scenario
1030 self.set_dc_connection(peer_conn)
1032 while not self.replication_complete():
1033 # pretend we've received other link targets out of order and that's
1034 # forced us to use GET_TGT. This checks the peer doesn't fail
1035 # trying to fetch a cross-partition target object that doesn't
1037 self.repl_get_next(get_tgt=True)
1039 self.set_dc_connection(self.default_conn)
1040 self.init_test_state()
1042 # Now sync across the partition containing the link target object
1043 self.sync_DCs(nc_dn=self.config_dn)
1044 self._enable_all_repl(self.dnsname_dc2)
1046 # Get the replication data from each DC in turn.
1047 # Check that both return the cross-partition link (note we're not
1048 # checking the config domain NC here for simplicity)
1049 self.assert_DCs_replication_is_consistent(peer_conn,
1050 all_objects=[la_source],
1051 expected_links=[la_source])
1053 # the cross-partition linked attribute has a missing backlink. Check
1054 # that we can still delete it successfully
1055 self.delete_attribute(la_source, "managedBy", la_target)
1058 res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1059 attrs=["managedBy"],
1060 controls=['extended_dn:1:0'],
1061 scope=ldb.SCOPE_BASE)
1062 self.assertFalse("managedBy" in res[0],
1063 "%s in DB still has managedBy attribute" % la_source)
1064 res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1065 attrs=["managedBy"],
1066 controls=['extended_dn:1:0'],
1067 scope=ldb.SCOPE_BASE)
1068 self.assertFalse("managedBy" in res[0],
1069 "%s in DB still has managedBy attribute" % la_source)
1071 # Check receiving a cross-partition link to a deleted target.
1072 # Delete the target and make sure the deletion is sync'd between DCs
1073 target_guid = self.get_object_guid(la_target)
1074 self.test_ldb_dc.delete(la_target)
1075 self.sync_DCs(nc_dn=self.config_dn)
1076 self._disable_all_repl(self.dnsname_dc2)
1078 # re-animate the target
1079 self.restore_deleted_object(target_guid, la_target)
1080 self.modify_object(la_source, "managedBy", la_target)
1082 # now sync the link - because the target is in another partition, the
1083 # peer DC receives a link for a deleted target, which it should accept
1085 res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1086 attrs=["managedBy"],
1087 controls=['extended_dn:1:0'],
1088 scope=ldb.SCOPE_BASE)
1089 self.assertTrue("managedBy" in res[0],
1090 "%s in DB missing managedBy attribute" % la_source)
1092 # cleanup the server object we created in the Configuration partition
1093 self.test_ldb_dc.delete(la_target)
1094 self._enable_all_repl(self.dnsname_dc2)
1096 def test_repl_get_tgt_multivalued_links(self):
1097 """Tests replication with multi-valued link attributes."""
1099 # create the target/source objects and link them together
1100 la_targets = self.create_object_range(0, 500, prefix="la_tgt")
1101 la_source = "CN=la_src,%s" % self.ou
1102 self.add_object(la_source, objectclass="msExchConfigurationContainer")
1104 for tgt in la_targets:
1105 self.modify_object(la_source, "addressBookRoots2", tgt)
1107 filler = self.create_object_range(0, 100, prefix="filler")
1109 # We should receive the objects/links in the following order:
1110 # [500 targets + 1 source][500 links][100 filler]
1111 expected_objects = la_targets + [la_source] + filler
1112 link_only_chunk = False
1114 # First do the replication without needing GET_TGT
1115 while not self.replication_complete():
1116 ctr6 = self.repl_get_next()
1118 if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
1119 link_only_chunk = True
1121 # we should receive one chunk that contains only links
1122 self.assertTrue(link_only_chunk,
1123 "Expected to receive a chunk containing only links")
1125 # check we received all the expected objects/links
1126 self.assert_expected_data(expected_objects)
1127 self.assert_expected_links([la_source], link_attr="addressBookRoots2",
1130 # Do the replication again, forcing the use of GET_TGT this time
1131 self.init_test_state()
1133 for x in range(0, 500):
1134 self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
1136 # The objects/links should get sent in the following order:
1137 # [1 source][500 targets][500 links][100 filler]
1139 while not self.replication_complete():
1140 ctr6 = self.repl_get_next()
1142 self.assertTrue(self.used_get_tgt,
1143 "Test didn't use the GET_TGT flag as expected")
1145 # check we received all the expected objects/links
1146 self.assert_expected_data(expected_objects)
1147 self.assert_expected_links([la_source], link_attr="addressBookRoots2",
1152 """Helper class to track a connection to another DC"""
1154 def __init__(self, drs_base, ldb_dc, dnsname_dc):
1155 self.ldb_dc = ldb_dc
1156 (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1157 (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1158 self.default_utdv = utdv