Fix PEP8 warning E303 too many blank lines
[kai/samba-autobuild/.git] / source4 / torture / drs / python / getncchanges.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Catalyst.Net Ltd. 2017
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 from __future__ import print_function
31 import drs_base
32 import samba.tests
33 import ldb
34 from ldb import SCOPE_BASE
35 import random
36
37 from samba.dcerpc import drsuapi
38
39
40 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
41     def setUp(self):
42         super(DrsReplicaSyncIntegrityTestCase, self).setUp()
43
44         self.init_test_state()
45
46         # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
47         # the vampire_dc), so we point this test directly at that DC
48         self.set_test_ldb_dc(self.ldb_dc2)
49
50         self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
51                                                  "getncchanges"))
52         self.base_dn = self.test_ldb_dc.get_default_basedn()
53
54         self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
55         self.set_dc_connection(self.default_conn)
56
57     def tearDown(self):
58         super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
59         # tidyup groups and users
60         try:
61             self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
62         except ldb.LdbError as e:
63             (enum, string) = e.args
64             if enum == ldb.ERR_NO_SUCH_OBJECT:
65                 pass
66
67     def init_test_state(self):
68         self.rxd_dn_list = []
69         self.rxd_links = []
70         self.rxd_guids = []
71         self.last_ctr = None
72
73         # 100 is the minimum max_objects that Microsoft seems to honour
74         # (the max honoured is 400ish), so we use that in these tests
75         self.max_objects = 100
76
77         # store whether we used GET_TGT/GET_ANC flags in the requests
78         self.used_get_tgt = False
79         self.used_get_anc = False
80
81     def add_object(self, dn, objectclass="organizationalunit"):
82         """Adds an OU object"""
83         self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
84         res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
85         self.assertEquals(len(res), 1)
86
87     def modify_object(self, dn, attr, value):
88         """Modifies an object's USN by adding an attribute value to it"""
89         m = ldb.Message()
90         m.dn = ldb.Dn(self.test_ldb_dc, dn)
91         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
92         self.test_ldb_dc.modify(m)
93
94     def delete_attribute(self, dn, attr, value):
95         """Deletes an attribute from an object"""
96         m = ldb.Message()
97         m.dn = ldb.Dn(self.test_ldb_dc, dn)
98         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
99         self.test_ldb_dc.modify(m)
100
101     def start_new_repl_cycle(self):
102         """Resets enough state info to start a new replication cycle"""
103         # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
104         # whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve
105         self.rxd_links = []
106
107         self.used_get_tgt = False
108         self.used_get_anc = False
109         # mostly preserve self.last_ctr, so that we use the last HWM
110         if self.last_ctr is not None:
111             self.last_ctr.more_data = True
112
113     def create_object_range(self, start, end, prefix="",
114                             children=None, parent_list=None):
115         """
116         Creates a block of objects. Object names are numbered sequentially,
117         using the optional prefix supplied. If the children parameter is
118         supplied it will create a parent-child hierarchy and return the
119         top-level parents separately.
120         """
121         dn_list = []
122
123         # Use dummy/empty lists if we're not creating a parent/child hierarchy
124         if children is None:
125             children = []
126
127         if parent_list is None:
128             parent_list = []
129
130         # Create the parents first, then the children.
131         # This makes it easier to see in debug when GET_ANC takes effect
132         # because the parent/children become interleaved (by default,
133         # this approach means the objects are organized into blocks of
134         # parents and blocks of children together)
135         for x in range(start, end):
136             ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
137             self.add_object(ou)
138             dn_list.append(ou)
139
140             # keep track of the top-level parents (if needed)
141             parent_list.append(ou)
142
143         # create the block of children (if needed)
144         for x in range(start, end):
145             for child in children:
146                 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
147                 self.add_object(ou)
148                 dn_list.append(ou)
149
150         return dn_list
151
152     def assert_expected_data(self, expected_list):
153         """
154         Asserts that we received all the DNs that we expected and
155         none are missing.
156         """
157         received_list = self.rxd_dn_list
158
159         # Note that with GET_ANC Windows can end up sending the same parent
160         # object multiple times, so this might be noteworthy but doesn't
161         # warrant failing the test
162         if (len(received_list) != len(expected_list)):
163             print("Note: received %d objects but expected %d" % (len(received_list),
164                                                                  len(expected_list)))
165
166         # Check that we received every object that we were expecting
167         for dn in expected_list:
168             self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
169
170     def test_repl_integrity(self):
171         """
172         Modify the objects being replicated while the replication is still
173         in progress and check that no object loss occurs.
174         """
175
176         # The server behaviour differs between samba and Windows. Samba returns
177         # the objects in the original order (up to the pre-modify HWM). Windows
178         # incorporates the modified objects and returns them in the new order
179         # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
180         # docs state the Windows behaviour is optional.
181
182         # Create a range of objects to replicate.
183         expected_dn_list = self.create_object_range(0, 400)
184         (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
185
186         # We ask for the first page of 100 objects.
187         # For this test, we don't care what order we receive the objects in,
188         # so long as by the end we've received everything
189         self.repl_get_next()
190
191         # Modify some of the second page of objects. This should bump the highwatermark
192         for x in range(100, 200):
193             self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
194
195         (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
196         self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
197
198         # Get the remaining blocks of data
199         while not self.replication_complete():
200             self.repl_get_next()
201
202         # Check we still receive all the objects we're expecting
203         self.assert_expected_data(expected_dn_list)
204
205     def is_parent_known(self, dn, known_dn_list):
206         """
207         Returns True if the parent of the dn specified is in known_dn_list
208         """
209
210         # we can sometimes get system objects like the RID Manager returned.
211         # Ignore anything that is not under the test OU we created
212         if self.ou not in dn:
213             return True
214
215         # Remove the child portion from the name to get the parent's DN
216         name_substrings = dn.split(",")
217         del name_substrings[0]
218
219         parent_dn = ",".join(name_substrings)
220
221         # check either this object is a parent (it's parent is the top-level
222         # test object), or its parent has been seen previously
223         return parent_dn == self.ou or parent_dn in known_dn_list
224
225     def _repl_send_request(self, get_anc=False, get_tgt=False):
226         """Sends a GetNCChanges request for the next block of replication data."""
227
228         # we're just trying to mimic regular client behaviour here, so just
229         # use the highwatermark in the last response we received
230         if self.last_ctr:
231             highwatermark = self.last_ctr.new_highwatermark
232             uptodateness_vector = self.last_ctr.uptodateness_vector
233         else:
234             # this is the first replication chunk
235             highwatermark = None
236             uptodateness_vector = None
237
238         # Ask for the next block of replication data
239         replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
240         more_flags = 0
241
242         if get_anc:
243             replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC
244             self.used_get_anc = True
245
246         if get_tgt:
247             more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
248             self.used_get_tgt = True
249
250         # return the response from the DC
251         return self._get_replication(replica_flags,
252                                      max_objects=self.max_objects,
253                                      highwatermark=highwatermark,
254                                      uptodateness_vector=uptodateness_vector,
255                                      more_flags=more_flags)
256
257     def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
258         """
259         Requests the next block of replication data. This tries to simulate
260         client behaviour - if we receive a replicated object that we don't know
261         the parent of, then re-request the block with the GET_ANC flag set.
262         If we don't know the target object for a linked attribute, then
263         re-request with GET_TGT.
264         """
265
266         # send a request to the DC and get the response
267         ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
268
269         # extract the object DNs and their GUIDs from the response
270         rxd_dn_list = self._get_ctr6_dn_list(ctr6)
271         rxd_guid_list = self._get_ctr6_object_guids(ctr6)
272
273         # we'll add new objects as we discover them, so take a copy of the
274         # ones we already know about, so we can modify these lists safely
275         known_objects = self.rxd_dn_list[:]
276         known_guids = self.rxd_guids[:]
277
278         # check that we know the parent for every object received
279         for i in range(0, len(rxd_dn_list)):
280
281             dn = rxd_dn_list[i]
282             guid = rxd_guid_list[i]
283
284             if self.is_parent_known(dn, known_objects):
285
286                 # the new DN is now known so add it to the list.
287                 # It may be the parent of another child in this block
288                 known_objects.append(dn)
289                 known_guids.append(guid)
290             else:
291                 # If we've already set the GET_ANC flag then it should mean
292                 # we receive the parents before the child
293                 self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
294
295                 print("Unknown parent for %s - try GET_ANC" % dn)
296
297                 # try the same thing again with the GET_ANC flag set this time
298                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
299                                           assert_links=assert_links)
300
301         # check we know about references to any objects in the linked attritbutes
302         received_links = self._get_ctr6_links(ctr6)
303
304         # This is so that older versions of Samba fail - we want the links to be
305         # sent roughly with the objects, rather than getting all links at the end
306         if assert_links:
307             self.assertTrue(len(received_links) > 0,
308                             "Links were expected in the GetNCChanges response")
309
310         for link in received_links:
311
312             # skip any links that aren't part of the test
313             if self.ou not in link.targetDN:
314                 continue
315
316             # check the source object is known (Windows can actually send links
317             # where we don't know the source object yet). Samba shouldn't ever
318             # hit this case because it gets the links based on the source
319             if link.identifier not in known_guids:
320
321                 # If we've already set the GET_ANC flag then it should mean
322                 # this case doesn't happen
323                 self.assertFalse(get_anc, "Unknown source object for GUID %s"
324                                  % link.identifier)
325
326                 print("Unknown source GUID %s - try GET_ANC" % link.identifier)
327
328                 # try the same thing again with the GET_ANC flag set this time
329                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
330                                           assert_links=assert_links)
331
332             # check we know the target object
333             if link.targetGUID not in known_guids:
334
335                 # If we've already set the GET_TGT flag then we should have
336                 # already received any objects we need to know about
337                 self.assertFalse(get_tgt, "Unknown linked target for object %s"
338                                  % link.targetDN)
339
340                 print("Unknown target for %s - try GET_TGT" % link.targetDN)
341
342                 # try the same thing again with the GET_TGT flag set this time
343                 return self.repl_get_next(get_anc=get_anc, get_tgt=True,
344                                           assert_links=assert_links)
345
346         # store the last successful result so we know what HWM to request next
347         self.last_ctr = ctr6
348
349         # store the objects, GUIDs, and links we received
350         self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
351         self.rxd_links += self._get_ctr6_links(ctr6)
352         self.rxd_guids += self._get_ctr6_object_guids(ctr6)
353
354         return ctr6
355
356     def replication_complete(self):
357         """Returns True if the current/last replication cycle is complete"""
358
359         if self.last_ctr is None or self.last_ctr.more_data:
360             return False
361         else:
362             return True
363
364     def test_repl_integrity_get_anc(self):
365         """
366         Modify the parent objects being replicated while the replication is still
367         in progress (using GET_ANC) and check that no object loss occurs.
368         """
369
370         # Note that GET_ANC behaviour varies between Windows and Samba.
371         # On Samba GET_ANC results in the replication restarting from the very
372         # beginning. After that, Samba remembers GET_ANC and also sends the
373         # parents in subsequent requests (regardless of whether GET_ANC is
374         # specified in the later request).
375         # Windows only sends the parents if GET_ANC was specified in the last
376         # request. It will also resend a parent, even if it's already sent the
377         # parent in a previous response (whereas Samba doesn't).
378
379         # Create a small block of 50 parents, each with 2 children (A and B)
380         # This is so that we receive some children in the first block, so we
381         # can resend with GET_ANC before we learn too many parents
382         parent_dn_list = []
383         expected_dn_list = self.create_object_range(0, 50, prefix="parent",
384                                                     children=("A", "B"),
385                                                     parent_list=parent_dn_list)
386
387         # create the remaining parents and children
388         expected_dn_list += self.create_object_range(50, 150, prefix="parent",
389                                                      children=("A", "B"),
390                                                      parent_list=parent_dn_list)
391
392         # We've now got objects in the following order:
393         # [50 parents][100 children][100 parents][200 children]
394
395         # Modify the first parent so that it's now ordered last by USN
396         # This means we set the GET_ANC flag pretty much straight away
397         # because we receive the first child before the first parent
398         self.modify_object(parent_dn_list[0], "displayName", "OU0")
399
400         # modify a later block of parents so they also get reordered
401         for x in range(50, 100):
402             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
403
404         # Get the first block of objects - this should resend the request with
405         # GET_ANC set because we won't know about the first child's parent.
406         # On samba GET_ANC essentially starts the sync from scratch again, so
407         # we get this over with early before we learn too many parents
408         self.repl_get_next()
409
410         # modify the last chunk of parents. They should now have a USN higher
411         # than the highwater-mark for the replication cycle
412         for x in range(100, 150):
413             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
414
415         # Get the remaining blocks of data - this will resend the request with
416         # GET_ANC if it encounters an object it doesn't have the parent for.
417         while not self.replication_complete():
418             self.repl_get_next()
419
420         # The way the test objects have been created should force
421         # self.repl_get_next() to use the GET_ANC flag. If this doesn't
422         # actually happen, then the test isn't doing its job properly
423         self.assertTrue(self.used_get_anc,
424                         "Test didn't use the GET_ANC flag as expected")
425
426         # Check we get all the objects we're expecting
427         self.assert_expected_data(expected_dn_list)
428
429     def assert_expected_links(self, objects_with_links, link_attr="managedBy",
430                               num_expected=None):
431         """
432         Asserts that a GetNCChanges response contains any expected links
433         for the objects it contains.
434         """
435         received_links = self.rxd_links
436
437         if num_expected is None:
438             num_expected = len(objects_with_links)
439
440         self.assertTrue(len(received_links) == num_expected,
441                         "Received %d links but expected %d"
442                         % (len(received_links), num_expected))
443
444         for dn in objects_with_links:
445             self.assert_object_has_link(dn, link_attr, received_links)
446
447     def assert_object_has_link(self, dn, link_attr, received_links):
448         """
449         Queries the object in the DB and asserts there is a link in the
450         GetNCChanges response that matches.
451         """
452
453         # Look up the link attribute in the DB
454         # The extended_dn option will dump the GUID info for the link
455         # attribute (as a hex blob)
456         res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr],
457                                       controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE)
458
459         # We didn't find the expected link attribute in the DB for the object.
460         # Something has gone wrong somewhere...
461         self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s"
462                         % (dn, link_attr))
463
464         # find the received link in the list and assert that the target and
465         # source GUIDs match what's in the DB
466         for val in res[0][link_attr]:
467             # Work out the expected source and target GUIDs for the DB link
468             target_dn = ldb.Dn(self.test_ldb_dc, val)
469             targetGUID_blob = target_dn.get_extended_component("GUID")
470             sourceGUID_blob = res[0].dn.get_extended_component("GUID")
471
472             found = False
473
474             for link in received_links:
475                 if link.selfGUID_blob == sourceGUID_blob and \
476                    link.targetGUID_blob == targetGUID_blob:
477
478                     found = True
479
480                     if self._debug:
481                         print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
482                     break
483
484             self.assertTrue(found, "Did not receive expected link for DN %s" % dn)
485
486     def test_repl_get_tgt(self):
487         """
488         Creates a scenario where we should receive the linked attribute before
489         we know about the target object, and therefore need to use GET_TGT.
490         Note: Samba currently avoids this problem by sending all its links last
491         """
492
493         # create the test objects
494         reportees = self.create_object_range(0, 100, prefix="reportee")
495         managers = self.create_object_range(0, 100, prefix="manager")
496         all_objects = managers + reportees
497         expected_links = reportees
498
499         # add a link attribute to each reportee object that points to the
500         # corresponding manager object as the target
501         for i in range(0, 100):
502             self.modify_object(reportees[i], "managedBy", managers[i])
503
504         # touch the managers (the link-target objects) again to make sure the
505         # reportees (link source objects) get returned first by the replication
506         for i in range(0, 100):
507             self.modify_object(managers[i], "displayName", "OU%d" % i)
508
509         links_expected = True
510
511         # Get all the replication data - this code should resend the requests
512         # with GET_TGT
513         while not self.replication_complete():
514
515             # get the next block of replication data (this sets GET_TGT if needed)
516             self.repl_get_next(assert_links=links_expected)
517             links_expected = len(self.rxd_links) < len(expected_links)
518
519         # The way the test objects have been created should force
520         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
521         # actually happen, then the test isn't doing its job properly
522         self.assertTrue(self.used_get_tgt,
523                         "Test didn't use the GET_TGT flag as expected")
524
525         # Check we get all the objects we're expecting
526         self.assert_expected_data(all_objects)
527
528         # Check we received links for all the reportees
529         self.assert_expected_links(expected_links)
530
531     def test_repl_get_tgt_chain(self):
532         """
533         Tests the behaviour of GET_TGT with a more complicated scenario.
534         Here we create a chain of objects linked together, so if we follow
535         the link target, then we'd traverse ~200 objects each time.
536         """
537
538         # create the test objects
539         objectsA = self.create_object_range(0, 100, prefix="AAA")
540         objectsB = self.create_object_range(0, 100, prefix="BBB")
541         objectsC = self.create_object_range(0, 100, prefix="CCC")
542
543         # create a complex set of object links:
544         #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
545         # Basically each object-A should link to a circular chain of 200 B/C
546         # objects. We create the links in separate chunks here, as it makes it
547         # clearer what happens with the USN (links on Windows have their own
548         # USN, so this approach means the A->B/B->C links aren't interleaved)
549         for i in range(0, 100):
550             self.modify_object(objectsA[i], "managedBy", objectsB[i])
551
552         for i in range(0, 100):
553             self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100])
554
555         for i in range(0, 100):
556             self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100])
557
558         all_objects = objectsA + objectsB + objectsC
559         expected_links = all_objects
560
561         # the default order the objects now get returned in should be:
562         # [A0-A99][B0-B99][C0-C99]
563
564         links_expected = True
565
566         # Get all the replication data - this code should resend the requests
567         # with GET_TGT
568         while not self.replication_complete():
569
570             # get the next block of replication data (this sets GET_TGT if needed)
571             self.repl_get_next(assert_links=links_expected)
572             links_expected = len(self.rxd_links) < len(expected_links)
573
574         # The way the test objects have been created should force
575         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
576         # actually happen, then the test isn't doing its job properly
577         self.assertTrue(self.used_get_tgt,
578                         "Test didn't use the GET_TGT flag as expected")
579
580         # Check we get all the objects we're expecting
581         self.assert_expected_data(all_objects)
582
583         # Check we received links for all the reportees
584         self.assert_expected_links(expected_links)
585
586     def test_repl_integrity_link_attr(self):
587         """
588         Tests adding links to new objects while a replication is in progress.
589         """
590
591         # create some source objects for the linked attributes, sandwiched
592         # between 2 blocks of filler objects
593         filler = self.create_object_range(0, 100, prefix="filler")
594         reportees = self.create_object_range(0, 100, prefix="reportee")
595         filler += self.create_object_range(100, 200, prefix="filler")
596
597         # Start the replication and get the first block of filler objects
598         # (We're being mean here and setting the GET_TGT flag right from the
599         # start. On earlier Samba versions, if the client encountered an
600         # unknown target object and retried with GET_TGT, it would restart the
601         # replication cycle from scratch, which avoids the problem).
602         self.repl_get_next(get_tgt=True)
603
604         # create the target objects and add the links. These objects should be
605         # outside the scope of the Samba replication cycle, but the links should
606         # still get sent with the source object
607         managers = self.create_object_range(0, 100, prefix="manager")
608
609         for i in range(0, 100):
610             self.modify_object(reportees[i], "managedBy", managers[i])
611
612         expected_objects = managers + reportees + filler
613         expected_links = reportees
614
615         # complete the replication
616         while not self.replication_complete():
617             self.repl_get_next(get_tgt=True)
618
619         # If we didn't receive the most recently created objects in the last
620         # replication cycle, then kick off another replication to get them
621         if len(self.rxd_dn_list) < len(expected_objects):
622             self.repl_get_next()
623
624             while not self.replication_complete():
625                 self.repl_get_next()
626
627         # Check we get all the objects we're expecting
628         self.assert_expected_data(expected_objects)
629
630         # Check we received links for all the parents
631         self.assert_expected_links(expected_links)
632
633     def test_repl_get_anc_link_attr(self):
634         """
635         A basic GET_ANC test where the parents have linked attributes
636         """
637
638         # Create a block of 100 parents and 100 children
639         parent_dn_list = []
640         expected_dn_list = self.create_object_range(0, 100, prefix="parent",
641                                                     children=("A"),
642                                                     parent_list=parent_dn_list)
643
644         # Add links from the parents to the children
645         for x in range(0, 100):
646             self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100])
647
648         # add some filler objects at the end. This allows us to easily see
649         # which chunk the links get sent in
650         expected_dn_list += self.create_object_range(0, 100, prefix="filler")
651
652         # We've now got objects in the following order:
653         # [100 x children][100 x parents][100 x filler]
654
655         # Get the replication data - because the block of children come first,
656         # this should retry the request with GET_ANC
657         while not self.replication_complete():
658             self.repl_get_next()
659
660         self.assertTrue(self.used_get_anc,
661                         "Test didn't use the GET_ANC flag as expected")
662
663         # Check we get all the objects we're expecting
664         self.assert_expected_data(expected_dn_list)
665
666         # Check we received links for all the parents
667         self.assert_expected_links(parent_dn_list)
668
669     def test_repl_get_tgt_and_anc(self):
670         """
671         Check we can resolve an unknown ancestor when fetching the link target,
672         i.e. tests using GET_TGT and GET_ANC in combination
673         """
674
675         # Create some parent/child objects (the child will be the link target)
676         parents = []
677         all_objects = self.create_object_range(0, 100, prefix="parent",
678                                                children=["la_tgt"],
679                                                parent_list=parents)
680
681         children = [item for item in all_objects if item not in parents]
682
683         # create the link source objects and link them to the child/target
684         la_sources = self.create_object_range(0, 100, prefix="la_src")
685         all_objects += la_sources
686
687         for i in range(0, 100):
688             self.modify_object(la_sources[i], "managedBy", children[i])
689
690         expected_links = la_sources
691
692         # modify the children/targets so they come after the link source
693         for x in range(0, 100):
694             self.modify_object(children[x], "displayName", "OU%d" % x)
695
696         # modify the parents, so they now come last in the replication
697         for x in range(0, 100):
698             self.modify_object(parents[x], "displayName", "OU%d" % x)
699
700         # We've now got objects in the following order:
701         # [100 la_source][100 la_target][100 parents (of la_target)]
702
703         links_expected = True
704
705         # Get all the replication data - this code should resend the requests
706         # with GET_TGT and GET_ANC
707         while not self.replication_complete():
708
709             # get the next block of replication data (this sets GET_TGT/GET_ANC)
710             self.repl_get_next(assert_links=links_expected)
711             links_expected = len(self.rxd_links) < len(expected_links)
712
713         # The way the test objects have been created should force
714         # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
715         # doesn't actually happen, then the test isn't doing its job properly
716         self.assertTrue(self.used_get_tgt,
717                         "Test didn't use the GET_TGT flag as expected")
718         self.assertTrue(self.used_get_anc,
719                         "Test didn't use the GET_ANC flag as expected")
720
721         # Check we get all the objects we're expecting
722         self.assert_expected_data(all_objects)
723
724         # Check we received links for all the link sources
725         self.assert_expected_links(expected_links)
726
727         # Second part of test. Add some extra objects and kick off another
728         # replication. The test code will use the HWM from the last replication
729         # so we'll only receive the objects we modify below
730         self.start_new_repl_cycle()
731
732         # add an extra level of grandchildren that hang off a child
733         # that got created last time
734         new_parent = "OU=test_new_parent,%s" % children[0]
735         self.add_object(new_parent)
736         new_children = []
737
738         for x in range(0, 50):
739             dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
740             self.add_object(dn)
741             new_children.append(dn)
742
743         # replace half of the links to point to the new children
744         for x in range(0, 50):
745             self.delete_attribute(la_sources[x], "managedBy", children[x])
746             self.modify_object(la_sources[x], "managedBy", new_children[x])
747
748         # add some filler objects to fill up the 1st chunk
749         filler = self.create_object_range(0, 100, prefix="filler")
750
751         # modify the new children/targets so they come after the link source
752         for x in range(0, 50):
753             self.modify_object(new_children[x], "displayName", "OU-%d" % x)
754
755         # modify the parent, so it now comes last in the replication
756         self.modify_object(new_parent, "displayName", "OU%d" % x)
757
758         # We should now get the modified objects in the following order:
759         # [50 links (x 2)][100 filler][50 new children][new parent]
760         # Note that the link sources aren't actually sent (their new linked
761         # attributes are sent, but apart from that, nothing has changed)
762         all_objects = filler + new_children + [new_parent]
763         expected_links = la_sources[:50]
764
765         links_expected = True
766
767         while not self.replication_complete():
768             self.repl_get_next(assert_links=links_expected)
769             links_expected = len(self.rxd_links) < len(expected_links)
770
771         self.assertTrue(self.used_get_tgt,
772                         "Test didn't use the GET_TGT flag as expected")
773         self.assertTrue(self.used_get_anc,
774                         "Test didn't use the GET_ANC flag as expected")
775
776         # Check we get all the objects we're expecting
777         self.assert_expected_data(all_objects)
778
779         # Check we received links (50 deleted links and 50 new)
780         self.assert_expected_links(expected_links, num_expected=100)
781
782     def _repl_integrity_obj_deletion(self, delete_link_source=True):
783         """
784         Tests deleting link objects while a replication is in progress.
785         """
786
787         # create some objects and link them together, with some filler
788         # object in between the link sources
789         la_sources = self.create_object_range(0, 100, prefix="la_source")
790         la_targets = self.create_object_range(0, 100, prefix="la_targets")
791
792         for i in range(0, 50):
793             self.modify_object(la_sources[i], "managedBy", la_targets[i])
794
795         filler = self.create_object_range(0, 100, prefix="filler")
796
797         for i in range(50, 100):
798             self.modify_object(la_sources[i], "managedBy", la_targets[i])
799
800         # touch the targets so that the sources get replicated first
801         for i in range(0, 100):
802             self.modify_object(la_targets[i], "displayName", "OU%d" % i)
803
804         # objects should now be in the following USN order:
805         # [50 la_source][100 filler][50 la_source][100 la_target]
806
807         # Get the first block containing 50 link sources
808         self.repl_get_next()
809
810         # delete either the link targets or link source objects
811         if delete_link_source:
812             objects_to_delete = la_sources
813             # in GET_TGT testenvs we only receive the first 50 source objects
814             expected_objects = la_sources[:50] + la_targets + filler
815         else:
816             objects_to_delete = la_targets
817             expected_objects = la_sources + filler
818
819         for obj in objects_to_delete:
820             self.ldb_dc2.delete(obj)
821
822         # complete the replication
823         while not self.replication_complete():
824             self.repl_get_next()
825
826         # Check we get all the objects we're expecting
827         self.assert_expected_data(expected_objects)
828
829         # we can't use assert_expected_links() here because it tries to check
830         # against the deleted objects on the DC. (Although we receive some
831         # links from the first block processed, the Samba client should end up
832         # deleting these, as the source/target object involved is deleted)
833         self.assertTrue(len(self.rxd_links) == 50,
834                         "Expected 50 links, not %d" % len(self.rxd_links))
835
836     def test_repl_integrity_src_obj_deletion(self):
837         self._repl_integrity_obj_deletion(delete_link_source=True)
838
839     def test_repl_integrity_tgt_obj_deletion(self):
840         self._repl_integrity_obj_deletion(delete_link_source=False)
841
842     def restore_deleted_object(self, guid, new_dn):
843         """Re-animates a deleted object"""
844
845         res = self.test_ldb_dc.search(base="<GUID=%s>" % self._GUID_string(guid), attrs=["isDeleted"],
846                                   controls=['show_deleted:1'], scope=ldb.SCOPE_BASE)
847         if len(res) != 1:
848             return
849
850         msg = ldb.Message()
851         msg.dn = res[0].dn
852         msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
853         msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
854         self.test_ldb_dc.modify(msg, ["show_deleted:1"])
855
856     def sync_DCs(self, nc_dn=None):
857         # make sure DC1 has all the changes we've made to DC2
858         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn)
859
860     def get_object_guid(self, dn):
861         res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE)
862         return res[0]['objectGUID'][0]
863
864     def set_dc_connection(self, conn):
865         """
866         Switches over the connection state info that the underlying drs_base
867         class uses so that we replicate with a different DC.
868         """
869         self.default_hwm = conn.default_hwm
870         self.default_utdv = conn.default_utdv
871         self.drs = conn.drs
872         self.drs_handle = conn.drs_handle
873         self.set_test_ldb_dc(conn.ldb_dc)
874
875     def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
876                                              expected_links):
877         """
878         Replicates against both the primary and secondary DCs in the testenv
879         and checks that both return the expected results.
880         """
881         print("Checking replication against primary test DC...")
882
883         # get the replication data from the test DC first
884         while not self.replication_complete():
885             self.repl_get_next()
886
887         # Check we get all the objects and links we're expecting
888         self.assert_expected_data(all_objects)
889         self.assert_expected_links(expected_links)
890
891         # switch over the DC state info so we now talk to the peer DC
892         self.set_dc_connection(peer_conn)
893         self.init_test_state()
894
895         print("Checking replication against secondary test DC...")
896
897         # check that we get the same information from the 2nd DC
898         while not self.replication_complete():
899             self.repl_get_next()
900
901         self.assert_expected_data(all_objects)
902         self.assert_expected_links(expected_links)
903
904         # switch back to using the default connection
905         self.set_dc_connection(self.default_conn)
906
907     def test_repl_integrity_obj_reanimation(self):
908         """
909         Checks receiving links for a re-animated object doesn't lose links.
910         We test this against the peer DC to make sure it doesn't drop links.
911         """
912
913         # This test is a little different in that we're particularly interested
914         # in exercising the replmd client code on the second DC.
915         # First, make sure the peer DC has the base OU, then connect to it (so
916         # we store its inital HWM)
917         self.sync_DCs()
918         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
919
920         # create the link source/target objects
921         la_sources = self.create_object_range(0, 100, prefix="la_src")
922         la_targets = self.create_object_range(0, 100, prefix="la_tgt")
923
924         # store the target object's GUIDs (we need to know these to reanimate them)
925         target_guids = []
926
927         for dn in la_targets:
928             target_guids.append(self.get_object_guid(dn))
929
930         # delete the link target
931         for x in range(0, 100):
932             self.ldb_dc2.delete(la_targets[x])
933
934         # sync the DCs, then disable replication. We want the peer DC to get
935         # all the following changes in a single replication cycle
936         self.sync_DCs()
937         self._disable_all_repl(self.dnsname_dc2)
938
939         # restore the target objects for the linked attributes again
940         for x in range(0, 100):
941             self.restore_deleted_object(target_guids[x], la_targets[x])
942
943         # add the links
944         for x in range(0, 100):
945             self.modify_object(la_sources[x], "managedBy", la_targets[x])
946
947         # create some additional filler objects
948         filler = self.create_object_range(0, 100, prefix="filler")
949
950         # modify the targets so they now come last
951         for x in range(0, 100):
952             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
953
954         # the objects should now be sent in the following order:
955         # [la sources + links][filler][la targets]
956         all_objects = la_sources + la_targets + filler
957         expected_links = la_sources
958
959         # Enable replication again make sure the 2 DCs are back in sync
960         self._enable_all_repl(self.dnsname_dc2)
961         self.sync_DCs()
962
963         # Get the replication data from each DC in turn.
964         # Check that both give us all the objects and links we're expecting,
965         # i.e. no links were lost
966         self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
967                                                   expected_links)
968
969     def test_repl_integrity_cross_partition_links(self):
970         """
971         Checks that a cross-partition link to an unknown target object does
972         not result in missing links.
973         """
974
975         # check the peer DC is up-to-date, then connect (storing its HWM)
976         self.sync_DCs()
977         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
978
979         # stop replication so the peer gets the following objects in one go
980         self._disable_all_repl(self.dnsname_dc2)
981
982         # create a link source object in the main NC
983         la_source = "OU=cross_nc_src,%s" % self.ou
984         self.add_object(la_source)
985
986         # create the link target (a server object) in the config NC
987         rand = random.randint(1, 10000000)
988         la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \
989                     "CN=Sites,%s" % (rand, self.config_dn)
990         self.add_object(la_target, objectclass="server")
991
992         # add a cross-partition link between the two
993         self.modify_object(la_source, "managedBy", la_target)
994
995         # First, sync across to the peer the NC containing the link source object
996         self.sync_DCs()
997
998         # Now, before the peer has received the partition containing the target
999         # object, try replicating from the peer. It will only know about half
1000         # of the link at this point, but it should be a valid scenario
1001         self.set_dc_connection(peer_conn)
1002
1003         while not self.replication_complete():
1004             # pretend we've received other link targets out of order and that's
1005             # forced us to use GET_TGT. This checks the peer doesn't fail trying
1006             # to fetch a cross-partition target object that doesn't exist
1007             self.repl_get_next(get_tgt=True)
1008
1009         self.set_dc_connection(self.default_conn)
1010         self.init_test_state()
1011
1012         # Now sync across the partition containing the link target object
1013         self.sync_DCs(nc_dn=self.config_dn)
1014         self._enable_all_repl(self.dnsname_dc2)
1015
1016         # Get the replication data from each DC in turn.
1017         # Check that both return the cross-partition link (note we're not
1018         # checking the config domain NC here for simplicity)
1019         self.assert_DCs_replication_is_consistent(peer_conn,
1020                                                   all_objects=[la_source],
1021                                                   expected_links=[la_source])
1022
1023         # the cross-partition linked attribute has a missing backlink. Check
1024         # that we can still delete it successfully
1025         self.delete_attribute(la_source, "managedBy", la_target)
1026         self.sync_DCs()
1027
1028         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1029                                       attrs=["managedBy"],
1030                                       controls=['extended_dn:1:0'],
1031                                       scope=ldb.SCOPE_BASE)
1032         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1033                          % la_source)
1034         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1035                                       attrs=["managedBy"],
1036                                       controls=['extended_dn:1:0'],
1037                                       scope=ldb.SCOPE_BASE)
1038         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1039                          % la_source)
1040
1041         # Check receiving a cross-partition link to a deleted target.
1042         # Delete the target and make sure the deletion is sync'd between DCs
1043         target_guid = self.get_object_guid(la_target)
1044         self.test_ldb_dc.delete(la_target)
1045         self.sync_DCs(nc_dn=self.config_dn)        
1046         self._disable_all_repl(self.dnsname_dc2)
1047
1048         # re-animate the target
1049         self.restore_deleted_object(target_guid, la_target)
1050         self.modify_object(la_source, "managedBy", la_target)
1051
1052         # now sync the link - because the target is in another partition, the
1053         # peer DC receives a link for a deleted target, which it should accept
1054         self.sync_DCs()
1055         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1056                                       attrs=["managedBy"],
1057                                       controls=['extended_dn:1:0'],
1058                                       scope=ldb.SCOPE_BASE)
1059         self.assertTrue("managedBy" in res[0], "%s in DB missing managedBy attribute"
1060                         % la_source)
1061
1062         # cleanup the server object we created in the Configuration partition
1063         self.test_ldb_dc.delete(la_target)
1064         self._enable_all_repl(self.dnsname_dc2)
1065
1066     def test_repl_get_tgt_multivalued_links(self):
1067         """Tests replication with multi-valued link attributes."""
1068
1069         # create the target/source objects and link them together
1070         la_targets = self.create_object_range(0, 500, prefix="la_tgt")
1071         la_source = "CN=la_src,%s" % self.ou
1072         self.add_object(la_source, objectclass="msExchConfigurationContainer")
1073
1074         for tgt in la_targets:
1075             self.modify_object(la_source, "addressBookRoots2", tgt)
1076
1077         filler = self.create_object_range(0, 100, prefix="filler")
1078
1079         # We should receive the objects/links in the following order:
1080         # [500 targets + 1 source][500 links][100 filler]
1081         expected_objects = la_targets + [la_source] + filler
1082         link_only_chunk = False
1083
1084         # First do the replication without needing GET_TGT
1085         while not self.replication_complete():
1086             ctr6 = self.repl_get_next()
1087
1088             if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
1089                 link_only_chunk = True
1090
1091         # we should receive one chunk that contains only links
1092         self.assertTrue(link_only_chunk,
1093                         "Expected to receive a chunk containing only links")
1094
1095         # check we received all the expected objects/links
1096         self.assert_expected_data(expected_objects)
1097         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1098
1099         # Do the replication again, forcing the use of GET_TGT this time
1100         self.init_test_state()
1101
1102         for x in range(0, 500):
1103             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
1104
1105         # The objects/links should get sent in the following order:
1106         # [1 source][500 targets][500 links][100 filler]
1107
1108         while not self.replication_complete():
1109             ctr6 = self.repl_get_next()
1110
1111         self.assertTrue(self.used_get_tgt,
1112                         "Test didn't use the GET_TGT flag as expected")
1113
1114         # check we received all the expected objects/links
1115         self.assert_expected_data(expected_objects)
1116         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1117
1118
1119 class DcConnection:
1120     """Helper class to track a connection to another DC"""
1121
1122     def __init__(self, drs_base, ldb_dc, dnsname_dc):
1123         self.ldb_dc = ldb_dc
1124         (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1125         (self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1126
1127