d545fe07e584b0f20d6e8446fa7ed32e6b72d4db
[kai/samba-autobuild/.git] / source4 / torture / drs / python / getncchanges.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Catalyst.Net Ltd. 2017
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 from __future__ import print_function
31 import drs_base
32 import samba.tests
33 import ldb
34 from ldb import SCOPE_BASE
35 import random
36
37 from samba.dcerpc import drsuapi
38
39 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
40     def setUp(self):
41         super(DrsReplicaSyncIntegrityTestCase, self).setUp()
42
43         self.init_test_state()
44
45         # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
46         # the vampire_dc), so we point this test directly at that DC
47         self.set_test_ldb_dc(self.ldb_dc2)
48
49         self.ou = samba.tests.create_test_ou(self.test_ldb_dc, "getncchanges")
50         self.base_dn = self.test_ldb_dc.get_default_basedn()
51
52         self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
53         self.set_dc_connection(self.default_conn)
54
55     def tearDown(self):
56         super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
57         # tidyup groups and users
58         try:
59             self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
60         except ldb.LdbError as e:
61             (enum, string) = e.args
62             if enum == ldb.ERR_NO_SUCH_OBJECT:
63                 pass
64
65     def init_test_state(self):
66         self.rxd_dn_list = []
67         self.rxd_links = []
68         self.rxd_guids = []
69         self.last_ctr = None
70
71         # 100 is the minimum max_objects that Microsoft seems to honour
72         # (the max honoured is 400ish), so we use that in these tests
73         self.max_objects = 100
74
75         # store whether we used GET_TGT/GET_ANC flags in the requests
76         self.used_get_tgt = False
77         self.used_get_anc = False
78
79     def add_object(self, dn, objectclass="organizationalunit"):
80         """Adds an OU object"""
81         self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
82         res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
83         self.assertEquals(len(res), 1)
84
85     def modify_object(self, dn, attr, value):
86         """Modifies an object's USN by adding an attribute value to it"""
87         m = ldb.Message()
88         m.dn = ldb.Dn(self.test_ldb_dc, dn)
89         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
90         self.test_ldb_dc.modify(m)
91
92     def delete_attribute(self, dn, attr, value):
93         """Deletes an attribute from an object"""
94         m = ldb.Message()
95         m.dn = ldb.Dn(self.test_ldb_dc, dn)
96         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
97         self.test_ldb_dc.modify(m)
98
99     def start_new_repl_cycle(self):
100         """Resets enough state info to start a new replication cycle"""
101         # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
102         # whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve
103         self.rxd_links = []
104
105         self.used_get_tgt = False
106         self.used_get_anc = False
107         # mostly preserve self.last_ctr, so that we use the last HWM
108         if self.last_ctr is not None:
109             self.last_ctr.more_data = True
110
111     def create_object_range(self, start, end, prefix="",
112                             children=None, parent_list=None):
113         """
114         Creates a block of objects. Object names are numbered sequentially,
115         using the optional prefix supplied. If the children parameter is
116         supplied it will create a parent-child hierarchy and return the
117         top-level parents separately.
118         """
119         dn_list = []
120
121         # Use dummy/empty lists if we're not creating a parent/child hierarchy
122         if children is None:
123             children = []
124
125         if parent_list is None:
126             parent_list = []
127
128         # Create the parents first, then the children.
129         # This makes it easier to see in debug when GET_ANC takes effect
130         # because the parent/children become interleaved (by default,
131         # this approach means the objects are organized into blocks of
132         # parents and blocks of children together)
133         for x in range(start, end):
134             ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
135             self.add_object(ou)
136             dn_list.append(ou)
137
138             # keep track of the top-level parents (if needed)
139             parent_list.append(ou)
140
141         # create the block of children (if needed)
142         for x in range(start, end):
143             for child in children:
144                 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
145                 self.add_object(ou)
146                 dn_list.append(ou)
147
148         return dn_list
149
150     def assert_expected_data(self, expected_list):
151         """
152         Asserts that we received all the DNs that we expected and
153         none are missing.
154         """
155         received_list = self.rxd_dn_list
156
157         # Note that with GET_ANC Windows can end up sending the same parent
158         # object multiple times, so this might be noteworthy but doesn't
159         # warrant failing the test
160         if (len(received_list) != len(expected_list)):
161             print("Note: received %d objects but expected %d" %(len(received_list),
162                                                                 len(expected_list)))
163
164         # Check that we received every object that we were expecting
165         for dn in expected_list:
166             self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
167
168     def test_repl_integrity(self):
169         """
170         Modify the objects being replicated while the replication is still
171         in progress and check that no object loss occurs.
172         """
173
174         # The server behaviour differs between samba and Windows. Samba returns
175         # the objects in the original order (up to the pre-modify HWM). Windows
176         # incorporates the modified objects and returns them in the new order
177         # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
178         # docs state the Windows behaviour is optional.
179
180         # Create a range of objects to replicate.
181         expected_dn_list = self.create_object_range(0, 400)
182         (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
183
184         # We ask for the first page of 100 objects.
185         # For this test, we don't care what order we receive the objects in,
186         # so long as by the end we've received everything
187         self.repl_get_next()
188
189         # Modify some of the second page of objects. This should bump the highwatermark
190         for x in range(100, 200):
191             self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
192
193         (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
194         self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
195
196         # Get the remaining blocks of data
197         while not self.replication_complete():
198             self.repl_get_next()
199
200         # Check we still receive all the objects we're expecting
201         self.assert_expected_data(expected_dn_list)
202
203     def is_parent_known(self, dn, known_dn_list):
204         """
205         Returns True if the parent of the dn specified is in known_dn_list
206         """
207
208         # we can sometimes get system objects like the RID Manager returned.
209         # Ignore anything that is not under the test OU we created
210         if self.ou not in dn:
211             return True
212
213         # Remove the child portion from the name to get the parent's DN
214         name_substrings = dn.split(",")
215         del name_substrings[0]
216
217         parent_dn = ",".join(name_substrings)
218
219         # check either this object is a parent (it's parent is the top-level
220         # test object), or its parent has been seen previously
221         return parent_dn == self.ou or parent_dn in known_dn_list
222
223     def _repl_send_request(self, get_anc=False, get_tgt=False):
224         """Sends a GetNCChanges request for the next block of replication data."""
225
226         # we're just trying to mimic regular client behaviour here, so just
227         # use the highwatermark in the last response we received
228         if self.last_ctr:
229             highwatermark = self.last_ctr.new_highwatermark
230             uptodateness_vector = self.last_ctr.uptodateness_vector
231         else:
232             # this is the first replication chunk
233             highwatermark = None
234             uptodateness_vector = None
235
236         # Ask for the next block of replication data
237         replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
238         more_flags = 0
239
240         if get_anc:
241             replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC
242             self.used_get_anc = True
243
244         if get_tgt:
245             more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
246             self.used_get_tgt = True
247
248         # return the response from the DC
249         return self._get_replication(replica_flags,
250                                      max_objects=self.max_objects,
251                                      highwatermark=highwatermark,
252                                      uptodateness_vector=uptodateness_vector,
253                                      more_flags=more_flags)
254
255     def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
256         """
257         Requests the next block of replication data. This tries to simulate
258         client behaviour - if we receive a replicated object that we don't know
259         the parent of, then re-request the block with the GET_ANC flag set.
260         If we don't know the target object for a linked attribute, then
261         re-request with GET_TGT.
262         """
263
264         # send a request to the DC and get the response
265         ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
266
267         # extract the object DNs and their GUIDs from the response
268         rxd_dn_list = self._get_ctr6_dn_list(ctr6)
269         rxd_guid_list = self._get_ctr6_object_guids(ctr6)
270
271         # we'll add new objects as we discover them, so take a copy of the
272         # ones we already know about, so we can modify these lists safely
273         known_objects = self.rxd_dn_list[:]
274         known_guids = self.rxd_guids[:]
275
276         # check that we know the parent for every object received
277         for i in range(0, len(rxd_dn_list)):
278
279             dn = rxd_dn_list[i]
280             guid = rxd_guid_list[i]
281
282             if self.is_parent_known(dn, known_objects):
283
284                 # the new DN is now known so add it to the list.
285                 # It may be the parent of another child in this block
286                 known_objects.append(dn)
287                 known_guids.append(guid)
288             else:
289                 # If we've already set the GET_ANC flag then it should mean
290                 # we receive the parents before the child
291                 self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
292
293                 print("Unknown parent for %s - try GET_ANC" % dn)
294
295                 # try the same thing again with the GET_ANC flag set this time
296                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
297                                           assert_links=assert_links)
298
299         # check we know about references to any objects in the linked attritbutes
300         received_links = self._get_ctr6_links(ctr6)
301
302         # This is so that older versions of Samba fail - we want the links to be
303         # sent roughly with the objects, rather than getting all links at the end
304         if assert_links:
305             self.assertTrue(len(received_links) > 0,
306                             "Links were expected in the GetNCChanges response")
307
308         for link in received_links:
309
310             # skip any links that aren't part of the test
311             if self.ou not in link.targetDN:
312                 continue
313
314             # check the source object is known (Windows can actually send links
315             # where we don't know the source object yet). Samba shouldn't ever
316             # hit this case because it gets the links based on the source
317             if link.identifier not in known_guids:
318
319                 # If we've already set the GET_ANC flag then it should mean
320                 # this case doesn't happen
321                 self.assertFalse(get_anc, "Unknown source object for GUID %s"
322                                  % link.identifier)
323
324                 print("Unknown source GUID %s - try GET_ANC" % link.identifier)
325
326                 # try the same thing again with the GET_ANC flag set this time
327                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
328                                           assert_links=assert_links)
329
330             # check we know the target object
331             if link.targetGUID not in known_guids:
332
333                 # If we've already set the GET_TGT flag then we should have
334                 # already received any objects we need to know about
335                 self.assertFalse(get_tgt, "Unknown linked target for object %s"
336                                  % link.targetDN)
337
338                 print("Unknown target for %s - try GET_TGT" % link.targetDN)
339
340                 # try the same thing again with the GET_TGT flag set this time
341                 return self.repl_get_next(get_anc=get_anc, get_tgt=True,
342                                           assert_links=assert_links)
343
344         # store the last successful result so we know what HWM to request next
345         self.last_ctr = ctr6
346
347         # store the objects, GUIDs, and links we received
348         self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
349         self.rxd_links += self._get_ctr6_links(ctr6)
350         self.rxd_guids += self._get_ctr6_object_guids(ctr6)
351
352         return ctr6
353
354     def replication_complete(self):
355         """Returns True if the current/last replication cycle is complete"""
356
357         if self.last_ctr is None or self.last_ctr.more_data:
358             return False
359         else:
360             return True
361
362     def test_repl_integrity_get_anc(self):
363         """
364         Modify the parent objects being replicated while the replication is still
365         in progress (using GET_ANC) and check that no object loss occurs.
366         """
367
368         # Note that GET_ANC behaviour varies between Windows and Samba.
369         # On Samba GET_ANC results in the replication restarting from the very
370         # beginning. After that, Samba remembers GET_ANC and also sends the
371         # parents in subsequent requests (regardless of whether GET_ANC is
372         # specified in the later request).
373         # Windows only sends the parents if GET_ANC was specified in the last
374         # request. It will also resend a parent, even if it's already sent the
375         # parent in a previous response (whereas Samba doesn't).
376
377         # Create a small block of 50 parents, each with 2 children (A and B)
378         # This is so that we receive some children in the first block, so we
379         # can resend with GET_ANC before we learn too many parents
380         parent_dn_list = []
381         expected_dn_list = self.create_object_range(0, 50, prefix="parent",
382                                                     children=("A", "B"),
383                                                     parent_list=parent_dn_list)
384
385         # create the remaining parents and children
386         expected_dn_list += self.create_object_range(50, 150, prefix="parent",
387                                                      children=("A", "B"),
388                                                      parent_list=parent_dn_list)
389
390         # We've now got objects in the following order:
391         # [50 parents][100 children][100 parents][200 children]
392
393         # Modify the first parent so that it's now ordered last by USN
394         # This means we set the GET_ANC flag pretty much straight away
395         # because we receive the first child before the first parent
396         self.modify_object(parent_dn_list[0], "displayName", "OU0")
397
398         # modify a later block of parents so they also get reordered
399         for x in range(50, 100):
400             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
401
402         # Get the first block of objects - this should resend the request with
403         # GET_ANC set because we won't know about the first child's parent.
404         # On samba GET_ANC essentially starts the sync from scratch again, so
405         # we get this over with early before we learn too many parents
406         self.repl_get_next()
407
408         # modify the last chunk of parents. They should now have a USN higher
409         # than the highwater-mark for the replication cycle
410         for x in range(100, 150):
411             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
412
413         # Get the remaining blocks of data - this will resend the request with
414         # GET_ANC if it encounters an object it doesn't have the parent for.
415         while not self.replication_complete():
416             self.repl_get_next()
417
418         # The way the test objects have been created should force
419         # self.repl_get_next() to use the GET_ANC flag. If this doesn't
420         # actually happen, then the test isn't doing its job properly
421         self.assertTrue(self.used_get_anc,
422                         "Test didn't use the GET_ANC flag as expected")
423
424         # Check we get all the objects we're expecting
425         self.assert_expected_data(expected_dn_list)
426
427     def assert_expected_links(self, objects_with_links, link_attr="managedBy",
428                               num_expected=None):
429         """
430         Asserts that a GetNCChanges response contains any expected links
431         for the objects it contains.
432         """
433         received_links = self.rxd_links
434
435         if num_expected is None:
436             num_expected = len(objects_with_links)
437
438         self.assertTrue(len(received_links) == num_expected,
439                         "Received %d links but expected %d"
440                         %(len(received_links), num_expected))
441
442         for dn in objects_with_links:
443             self.assert_object_has_link(dn, link_attr, received_links)
444
445     def assert_object_has_link(self, dn, link_attr, received_links):
446         """
447         Queries the object in the DB and asserts there is a link in the
448         GetNCChanges response that matches.
449         """
450
451         # Look up the link attribute in the DB
452         # The extended_dn option will dump the GUID info for the link
453         # attribute (as a hex blob)
454         res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr],
455                                       controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE)
456
457         # We didn't find the expected link attribute in the DB for the object.
458         # Something has gone wrong somewhere...
459         self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s"
460                         %(dn, link_attr))
461
462         # find the received link in the list and assert that the target and
463         # source GUIDs match what's in the DB
464         for val in res[0][link_attr]:
465             # Work out the expected source and target GUIDs for the DB link
466             target_dn = ldb.Dn(self.test_ldb_dc, val)
467             targetGUID_blob = target_dn.get_extended_component("GUID")
468             sourceGUID_blob = res[0].dn.get_extended_component("GUID")
469
470             found = False
471
472             for link in received_links:
473                 if link.selfGUID_blob == sourceGUID_blob and \
474                    link.targetGUID_blob == targetGUID_blob:
475
476                     found = True
477
478                     if self._debug:
479                         print("Link %s --> %s" %(dn[:25], link.targetDN[:25]))
480                     break
481
482             self.assertTrue(found, "Did not receive expected link for DN %s" % dn)
483
484     def test_repl_get_tgt(self):
485         """
486         Creates a scenario where we should receive the linked attribute before
487         we know about the target object, and therefore need to use GET_TGT.
488         Note: Samba currently avoids this problem by sending all its links last
489         """
490
491         # create the test objects
492         reportees = self.create_object_range(0, 100, prefix="reportee")
493         managers = self.create_object_range(0, 100, prefix="manager")
494         all_objects = managers + reportees
495         expected_links = reportees
496
497         # add a link attribute to each reportee object that points to the
498         # corresponding manager object as the target
499         for i in range(0, 100):
500             self.modify_object(reportees[i], "managedBy", managers[i])
501
502         # touch the managers (the link-target objects) again to make sure the
503         # reportees (link source objects) get returned first by the replication
504         for i in range(0, 100):
505             self.modify_object(managers[i], "displayName", "OU%d" % i)
506
507         links_expected = True
508
509         # Get all the replication data - this code should resend the requests
510         # with GET_TGT
511         while not self.replication_complete():
512
513             # get the next block of replication data (this sets GET_TGT if needed)
514             self.repl_get_next(assert_links=links_expected)
515             links_expected = len(self.rxd_links) < len(expected_links)
516
517         # The way the test objects have been created should force
518         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
519         # actually happen, then the test isn't doing its job properly
520         self.assertTrue(self.used_get_tgt,
521                         "Test didn't use the GET_TGT flag as expected")
522
523         # Check we get all the objects we're expecting
524         self.assert_expected_data(all_objects)
525
526         # Check we received links for all the reportees
527         self.assert_expected_links(expected_links)
528
529     def test_repl_get_tgt_chain(self):
530         """
531         Tests the behaviour of GET_TGT with a more complicated scenario.
532         Here we create a chain of objects linked together, so if we follow
533         the link target, then we'd traverse ~200 objects each time.
534         """
535
536         # create the test objects
537         objectsA = self.create_object_range(0, 100, prefix="AAA")
538         objectsB = self.create_object_range(0, 100, prefix="BBB")
539         objectsC = self.create_object_range(0, 100, prefix="CCC")
540
541         # create a complex set of object links:
542         #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
543         # Basically each object-A should link to a circular chain of 200 B/C
544         # objects. We create the links in separate chunks here, as it makes it
545         # clearer what happens with the USN (links on Windows have their own
546         # USN, so this approach means the A->B/B->C links aren't interleaved)
547         for i in range(0, 100):
548             self.modify_object(objectsA[i], "managedBy", objectsB[i])
549
550         for i in range(0, 100):
551             self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100])
552
553         for i in range(0, 100):
554             self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100])
555
556         all_objects = objectsA + objectsB + objectsC
557         expected_links = all_objects
558
559         # the default order the objects now get returned in should be:
560         # [A0-A99][B0-B99][C0-C99]
561
562         links_expected = True
563
564         # Get all the replication data - this code should resend the requests
565         # with GET_TGT
566         while not self.replication_complete():
567
568             # get the next block of replication data (this sets GET_TGT if needed)
569             self.repl_get_next(assert_links=links_expected)
570             links_expected = len(self.rxd_links) < len(expected_links)
571
572         # The way the test objects have been created should force
573         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
574         # actually happen, then the test isn't doing its job properly
575         self.assertTrue(self.used_get_tgt,
576                         "Test didn't use the GET_TGT flag as expected")
577
578         # Check we get all the objects we're expecting
579         self.assert_expected_data(all_objects)
580
581         # Check we received links for all the reportees
582         self.assert_expected_links(expected_links)
583
584     def test_repl_integrity_link_attr(self):
585         """
586         Tests adding links to new objects while a replication is in progress.
587         """
588
589         # create some source objects for the linked attributes, sandwiched
590         # between 2 blocks of filler objects
591         filler = self.create_object_range(0, 100, prefix="filler")
592         reportees = self.create_object_range(0, 100, prefix="reportee")
593         filler += self.create_object_range(100, 200, prefix="filler")
594
595         # Start the replication and get the first block of filler objects
596         # (We're being mean here and setting the GET_TGT flag right from the
597         # start. On earlier Samba versions, if the client encountered an
598         # unknown target object and retried with GET_TGT, it would restart the
599         # replication cycle from scratch, which avoids the problem).
600         self.repl_get_next(get_tgt=True)
601
602         # create the target objects and add the links. These objects should be
603         # outside the scope of the Samba replication cycle, but the links should
604         # still get sent with the source object
605         managers = self.create_object_range(0, 100, prefix="manager")
606
607         for i in range(0, 100):
608             self.modify_object(reportees[i], "managedBy", managers[i])
609
610         expected_objects = managers + reportees + filler
611         expected_links = reportees
612
613         # complete the replication
614         while not self.replication_complete():
615             self.repl_get_next(get_tgt=True)
616
617         # If we didn't receive the most recently created objects in the last
618         # replication cycle, then kick off another replication to get them
619         if len(self.rxd_dn_list) < len(expected_objects):
620             self.repl_get_next()
621
622             while not self.replication_complete():
623                 self.repl_get_next()
624
625         # Check we get all the objects we're expecting
626         self.assert_expected_data(expected_objects)
627
628         # Check we received links for all the parents
629         self.assert_expected_links(expected_links)
630
631     def test_repl_get_anc_link_attr(self):
632         """
633         A basic GET_ANC test where the parents have linked attributes
634         """
635
636         # Create a block of 100 parents and 100 children
637         parent_dn_list = []
638         expected_dn_list = self.create_object_range(0, 100, prefix="parent",
639                                                     children=("A"),
640                                                     parent_list=parent_dn_list)
641
642         # Add links from the parents to the children
643         for x in range(0, 100):
644             self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100])
645
646         # add some filler objects at the end. This allows us to easily see
647         # which chunk the links get sent in
648         expected_dn_list += self.create_object_range(0, 100, prefix="filler")
649
650         # We've now got objects in the following order:
651         # [100 x children][100 x parents][100 x filler]
652
653         # Get the replication data - because the block of children come first,
654         # this should retry the request with GET_ANC
655         while not self.replication_complete():
656             self.repl_get_next()
657
658         self.assertTrue(self.used_get_anc,
659                         "Test didn't use the GET_ANC flag as expected")
660
661         # Check we get all the objects we're expecting
662         self.assert_expected_data(expected_dn_list)
663
664         # Check we received links for all the parents
665         self.assert_expected_links(parent_dn_list)
666
667     def test_repl_get_tgt_and_anc(self):
668         """
669         Check we can resolve an unknown ancestor when fetching the link target,
670         i.e. tests using GET_TGT and GET_ANC in combination
671         """
672
673         # Create some parent/child objects (the child will be the link target)
674         parents = []
675         all_objects = self.create_object_range(0, 100, prefix="parent",
676                                                children=["la_tgt"],
677                                                parent_list=parents)
678
679         children = [item for item in all_objects if item not in parents]
680
681         # create the link source objects and link them to the child/target
682         la_sources = self.create_object_range(0, 100, prefix="la_src")
683         all_objects += la_sources
684
685         for i in range(0, 100):
686             self.modify_object(la_sources[i], "managedBy", children[i])
687
688         expected_links = la_sources
689
690         # modify the children/targets so they come after the link source
691         for x in range(0, 100):
692             self.modify_object(children[x], "displayName", "OU%d" % x)
693
694         # modify the parents, so they now come last in the replication
695         for x in range(0, 100):
696             self.modify_object(parents[x], "displayName", "OU%d" % x)
697
698         # We've now got objects in the following order:
699         # [100 la_source][100 la_target][100 parents (of la_target)]
700
701         links_expected = True
702
703         # Get all the replication data - this code should resend the requests
704         # with GET_TGT and GET_ANC
705         while not self.replication_complete():
706
707             # get the next block of replication data (this sets GET_TGT/GET_ANC)
708             self.repl_get_next(assert_links=links_expected)
709             links_expected = len(self.rxd_links) < len(expected_links)
710
711         # The way the test objects have been created should force
712         # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
713         # doesn't actually happen, then the test isn't doing its job properly
714         self.assertTrue(self.used_get_tgt,
715                         "Test didn't use the GET_TGT flag as expected")
716         self.assertTrue(self.used_get_anc,
717                         "Test didn't use the GET_ANC flag as expected")
718
719         # Check we get all the objects we're expecting
720         self.assert_expected_data(all_objects)
721
722         # Check we received links for all the link sources
723         self.assert_expected_links(expected_links)
724
725         # Second part of test. Add some extra objects and kick off another
726         # replication. The test code will use the HWM from the last replication
727         # so we'll only receive the objects we modify below
728         self.start_new_repl_cycle()
729
730         # add an extra level of grandchildren that hang off a child
731         # that got created last time
732         new_parent = "OU=test_new_parent,%s" % children[0]
733         self.add_object(new_parent)
734         new_children = []
735
736         for x in range(0, 50):
737             dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
738             self.add_object(dn)
739             new_children.append(dn)
740
741         # replace half of the links to point to the new children
742         for x in range(0, 50):
743             self.delete_attribute(la_sources[x], "managedBy", children[x])
744             self.modify_object(la_sources[x], "managedBy", new_children[x])
745
746         # add some filler objects to fill up the 1st chunk
747         filler = self.create_object_range(0, 100, prefix="filler")
748
749         # modify the new children/targets so they come after the link source
750         for x in range(0, 50):
751             self.modify_object(new_children[x], "displayName", "OU-%d" % x)
752
753         # modify the parent, so it now comes last in the replication
754         self.modify_object(new_parent, "displayName", "OU%d" % x)
755
756         # We should now get the modified objects in the following order:
757         # [50 links (x 2)][100 filler][50 new children][new parent]
758         # Note that the link sources aren't actually sent (their new linked
759         # attributes are sent, but apart from that, nothing has changed)
760         all_objects = filler + new_children + [new_parent]
761         expected_links = la_sources[:50]
762
763         links_expected = True
764
765         while not self.replication_complete():
766             self.repl_get_next(assert_links=links_expected)
767             links_expected = len(self.rxd_links) < len(expected_links)
768
769         self.assertTrue(self.used_get_tgt,
770                         "Test didn't use the GET_TGT flag as expected")
771         self.assertTrue(self.used_get_anc,
772                         "Test didn't use the GET_ANC flag as expected")
773
774         # Check we get all the objects we're expecting
775         self.assert_expected_data(all_objects)
776
777         # Check we received links (50 deleted links and 50 new)
778         self.assert_expected_links(expected_links, num_expected=100)
779
780     def _repl_integrity_obj_deletion(self, delete_link_source=True):
781         """
782         Tests deleting link objects while a replication is in progress.
783         """
784
785         # create some objects and link them together, with some filler
786         # object in between the link sources
787         la_sources = self.create_object_range(0, 100, prefix="la_source")
788         la_targets = self.create_object_range(0, 100, prefix="la_targets")
789
790         for i in range(0, 50):
791             self.modify_object(la_sources[i], "managedBy", la_targets[i])
792
793         filler = self.create_object_range(0, 100, prefix="filler")
794
795         for i in range(50, 100):
796             self.modify_object(la_sources[i], "managedBy", la_targets[i])
797
798         # touch the targets so that the sources get replicated first
799         for i in range(0, 100):
800             self.modify_object(la_targets[i], "displayName", "OU%d" % i)
801
802         # objects should now be in the following USN order:
803         # [50 la_source][100 filler][50 la_source][100 la_target]
804
805         # Get the first block containing 50 link sources
806         self.repl_get_next()
807
808         # delete either the link targets or link source objects
809         if delete_link_source:
810             objects_to_delete = la_sources
811             # in GET_TGT testenvs we only receive the first 50 source objects
812             expected_objects = la_sources[:50] + la_targets + filler
813         else:
814             objects_to_delete = la_targets
815             expected_objects = la_sources + filler
816
817         for obj in objects_to_delete:
818             self.ldb_dc2.delete(obj)
819
820         # complete the replication
821         while not self.replication_complete():
822             self.repl_get_next()
823
824         # Check we get all the objects we're expecting
825         self.assert_expected_data(expected_objects)
826
827         # we can't use assert_expected_links() here because it tries to check
828         # against the deleted objects on the DC. (Although we receive some
829         # links from the first block processed, the Samba client should end up
830         # deleting these, as the source/target object involved is deleted)
831         self.assertTrue(len(self.rxd_links) == 50,
832                         "Expected 50 links, not %d" % len(self.rxd_links))
833
834     def test_repl_integrity_src_obj_deletion(self):
835         self._repl_integrity_obj_deletion(delete_link_source=True)
836
837     def test_repl_integrity_tgt_obj_deletion(self):
838         self._repl_integrity_obj_deletion(delete_link_source=False)
839
840     def restore_deleted_object(self, guid, new_dn):
841         """Re-animates a deleted object"""
842
843         res = self.test_ldb_dc.search(base="<GUID=%s>" % self._GUID_string(guid), attrs=["isDeleted"],
844                                   controls=['show_deleted:1'], scope=ldb.SCOPE_BASE)
845         if len(res) != 1:
846             return
847
848         msg = ldb.Message()
849         msg.dn = res[0].dn
850         msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
851         msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
852         self.test_ldb_dc.modify(msg, ["show_deleted:1"])
853
854     def sync_DCs(self, nc_dn=None):
855         # make sure DC1 has all the changes we've made to DC2
856         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn)
857
858     def get_object_guid(self, dn):
859         res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE)
860         return res[0]['objectGUID'][0]
861
862
863     def set_dc_connection(self, conn):
864         """
865         Switches over the connection state info that the underlying drs_base
866         class uses so that we replicate with a different DC.
867         """
868         self.default_hwm = conn.default_hwm
869         self.default_utdv = conn.default_utdv
870         self.drs = conn.drs
871         self.drs_handle = conn.drs_handle
872         self.set_test_ldb_dc(conn.ldb_dc)
873
874     def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
875                                              expected_links):
876         """
877         Replicates against both the primary and secondary DCs in the testenv
878         and checks that both return the expected results.
879         """
880         print("Checking replication against primary test DC...")
881
882         # get the replication data from the test DC first
883         while not self.replication_complete():
884             self.repl_get_next()
885
886         # Check we get all the objects and links we're expecting
887         self.assert_expected_data(all_objects)
888         self.assert_expected_links(expected_links)
889
890         # switch over the DC state info so we now talk to the peer DC
891         self.set_dc_connection(peer_conn)
892         self.init_test_state()
893
894         print("Checking replication against secondary test DC...")
895
896         # check that we get the same information from the 2nd DC
897         while not self.replication_complete():
898             self.repl_get_next()
899
900         self.assert_expected_data(all_objects)
901         self.assert_expected_links(expected_links)
902
903         # switch back to using the default connection
904         self.set_dc_connection(self.default_conn)
905
906     def test_repl_integrity_obj_reanimation(self):
907         """
908         Checks receiving links for a re-animated object doesn't lose links.
909         We test this against the peer DC to make sure it doesn't drop links.
910         """
911
912         # This test is a little different in that we're particularly interested
913         # in exercising the replmd client code on the second DC.
914         # First, make sure the peer DC has the base OU, then connect to it (so
915         # we store its inital HWM)
916         self.sync_DCs()
917         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
918
919         # create the link source/target objects
920         la_sources = self.create_object_range(0, 100, prefix="la_src")
921         la_targets = self.create_object_range(0, 100, prefix="la_tgt")
922
923         # store the target object's GUIDs (we need to know these to reanimate them)
924         target_guids = []
925
926         for dn in la_targets:
927             target_guids.append(self.get_object_guid(dn))
928
929         # delete the link target
930         for x in range(0, 100):
931             self.ldb_dc2.delete(la_targets[x])
932
933         # sync the DCs, then disable replication. We want the peer DC to get
934         # all the following changes in a single replication cycle
935         self.sync_DCs()
936         self._disable_all_repl(self.dnsname_dc2)
937
938         # restore the target objects for the linked attributes again
939         for x in range(0, 100):
940             self.restore_deleted_object(target_guids[x], la_targets[x])
941
942         # add the links
943         for x in range(0, 100):
944             self.modify_object(la_sources[x], "managedBy", la_targets[x])
945
946         # create some additional filler objects
947         filler = self.create_object_range(0, 100, prefix="filler")
948
949         # modify the targets so they now come last
950         for x in range(0, 100):
951             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
952
953         # the objects should now be sent in the following order:
954         # [la sources + links][filler][la targets]
955         all_objects = la_sources + la_targets + filler
956         expected_links = la_sources
957
958         # Enable replication again make sure the 2 DCs are back in sync
959         self._enable_all_repl(self.dnsname_dc2)
960         self.sync_DCs()
961
962         # Get the replication data from each DC in turn.
963         # Check that both give us all the objects and links we're expecting,
964         # i.e. no links were lost
965         self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
966                                                   expected_links)
967
968     def test_repl_integrity_cross_partition_links(self):
969         """
970         Checks that a cross-partition link to an unknown target object does
971         not result in missing links.
972         """
973
974         # check the peer DC is up-to-date, then connect (storing its HWM)
975         self.sync_DCs()
976         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
977
978         # stop replication so the peer gets the following objects in one go
979         self._disable_all_repl(self.dnsname_dc2)
980
981         # create a link source object in the main NC
982         la_source = "OU=cross_nc_src,%s" % self.ou
983         self.add_object(la_source)
984
985         # create the link target (a server object) in the config NC
986         rand = random.randint(1, 10000000)
987         la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \
988                     "CN=Sites,%s" %(rand, self.config_dn)
989         self.add_object(la_target, objectclass="server")
990
991         # add a cross-partition link between the two
992         self.modify_object(la_source, "managedBy", la_target)
993
994         # First, sync across to the peer the NC containing the link source object
995         self.sync_DCs()
996
997         # Now, before the peer has received the partition containing the target
998         # object, try replicating from the peer. It will only know about half
999         # of the link at this point, but it should be a valid scenario
1000         self.set_dc_connection(peer_conn)
1001
1002         while not self.replication_complete():
1003             # pretend we've received other link targets out of order and that's
1004             # forced us to use GET_TGT. This checks the peer doesn't fail trying
1005             # to fetch a cross-partition target object that doesn't exist
1006             self.repl_get_next(get_tgt=True)
1007
1008         self.set_dc_connection(self.default_conn)
1009         self.init_test_state()
1010
1011         # Now sync across the partition containing the link target object
1012         self.sync_DCs(nc_dn=self.config_dn)
1013         self._enable_all_repl(self.dnsname_dc2)
1014
1015         # Get the replication data from each DC in turn.
1016         # Check that both return the cross-partition link (note we're not
1017         # checking the config domain NC here for simplicity)
1018         self.assert_DCs_replication_is_consistent(peer_conn,
1019                                                   all_objects=[la_source],
1020                                                   expected_links=[la_source])
1021
1022         # the cross-partition linked attribute has a missing backlink. Check
1023         # that we can still delete it successfully
1024         self.delete_attribute(la_source, "managedBy", la_target)
1025         self.sync_DCs()
1026
1027         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1028                                       attrs=["managedBy"],
1029                                       controls=['extended_dn:1:0'],
1030                                       scope=ldb.SCOPE_BASE)
1031         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1032                          % la_source)
1033         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1034                                       attrs=["managedBy"],
1035                                       controls=['extended_dn:1:0'],
1036                                       scope=ldb.SCOPE_BASE)
1037         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1038                          % la_source)
1039
1040         # Check receiving a cross-partition link to a deleted target.
1041         # Delete the target and make sure the deletion is sync'd between DCs
1042         target_guid = self.get_object_guid(la_target)
1043         self.test_ldb_dc.delete(la_target)
1044         self.sync_DCs(nc_dn=self.config_dn)        
1045         self._disable_all_repl(self.dnsname_dc2)
1046
1047         # re-animate the target
1048         self.restore_deleted_object(target_guid, la_target)
1049         self.modify_object(la_source, "managedBy", la_target)
1050
1051         # now sync the link - because the target is in another partition, the
1052         # peer DC receives a link for a deleted target, which it should accept
1053         self.sync_DCs()
1054         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1055                                       attrs=["managedBy"],
1056                                       controls=['extended_dn:1:0'],
1057                                       scope=ldb.SCOPE_BASE)
1058         self.assertTrue("managedBy" in res[0], "%s in DB missing managedBy attribute"
1059                         % la_source)
1060
1061         # cleanup the server object we created in the Configuration partition
1062         self.test_ldb_dc.delete(la_target)
1063         self._enable_all_repl(self.dnsname_dc2)
1064
1065     def test_repl_get_tgt_multivalued_links(self):
1066         """Tests replication with multi-valued link attributes."""
1067
1068         # create the target/source objects and link them together
1069         la_targets = self.create_object_range(0, 500, prefix="la_tgt")
1070         la_source = "CN=la_src,%s" % self.ou
1071         self.add_object(la_source, objectclass="msExchConfigurationContainer")
1072
1073         for tgt in la_targets:
1074             self.modify_object(la_source, "addressBookRoots2", tgt)
1075
1076         filler = self.create_object_range(0, 100, prefix="filler")
1077
1078         # We should receive the objects/links in the following order:
1079         # [500 targets + 1 source][500 links][100 filler]
1080         expected_objects = la_targets + [la_source] + filler
1081         link_only_chunk = False
1082
1083         # First do the replication without needing GET_TGT
1084         while not self.replication_complete():
1085             ctr6 = self.repl_get_next()
1086
1087             if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
1088                 link_only_chunk = True
1089
1090         # we should receive one chunk that contains only links
1091         self.assertTrue(link_only_chunk,
1092                         "Expected to receive a chunk containing only links")
1093
1094         # check we received all the expected objects/links
1095         self.assert_expected_data(expected_objects)
1096         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1097
1098         # Do the replication again, forcing the use of GET_TGT this time
1099         self.init_test_state()
1100
1101         for x in range(0, 500):
1102             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
1103
1104         # The objects/links should get sent in the following order:
1105         # [1 source][500 targets][500 links][100 filler]
1106
1107         while not self.replication_complete():
1108             ctr6 = self.repl_get_next()
1109
1110         self.assertTrue(self.used_get_tgt,
1111                         "Test didn't use the GET_TGT flag as expected")
1112
1113         # check we received all the expected objects/links
1114         self.assert_expected_data(expected_objects)
1115         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1116
1117
1118 class DcConnection:
1119     """Helper class to track a connection to another DC"""
1120
1121     def __init__(self, drs_base, ldb_dc, dnsname_dc):
1122         self.ldb_dc = ldb_dc
1123         (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1124         (self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1125
1126