PY3: change shebang to python3 in source4/torture dir
[samba.git] / source4 / torture / drs / python / getncchanges.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Catalyst.Net Ltd. 2017
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
28 #       getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
29 #
30
31 from __future__ import print_function
32 import drs_base
33 import samba.tests
34 import ldb
35 from ldb import SCOPE_BASE
36 import random
37
38 from samba.dcerpc import drsuapi
39
40
41 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
42     def setUp(self):
43         super(DrsReplicaSyncIntegrityTestCase, self).setUp()
44
45         self.init_test_state()
46
47         # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
48         # the vampire_dc), so we point this test directly at that DC
49         self.set_test_ldb_dc(self.ldb_dc2)
50
51         self.ou = str(samba.tests.create_test_ou(self.test_ldb_dc,
52                                                  "getncchanges"))
53         self.base_dn = self.test_ldb_dc.get_default_basedn()
54
55         self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
56         self.set_dc_connection(self.default_conn)
57
58     def tearDown(self):
59         super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
60         # tidyup groups and users
61         try:
62             self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
63         except ldb.LdbError as e:
64             (enum, string) = e.args
65             if enum == ldb.ERR_NO_SUCH_OBJECT:
66                 pass
67
68     def init_test_state(self):
69         self.rxd_dn_list = []
70         self.rxd_links = []
71         self.rxd_guids = []
72         self.last_ctr = None
73
74         # 100 is the minimum max_objects that Microsoft seems to honour
75         # (the max honoured is 400ish), so we use that in these tests
76         self.max_objects = 100
77
78         # store whether we used GET_TGT/GET_ANC flags in the requests
79         self.used_get_tgt = False
80         self.used_get_anc = False
81
82     def add_object(self, dn, objectclass="organizationalunit"):
83         """Adds an OU object"""
84         self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
85         res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
86         self.assertEquals(len(res), 1)
87
88     def modify_object(self, dn, attr, value):
89         """Modifies an object's USN by adding an attribute value to it"""
90         m = ldb.Message()
91         m.dn = ldb.Dn(self.test_ldb_dc, dn)
92         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
93         self.test_ldb_dc.modify(m)
94
95     def delete_attribute(self, dn, attr, value):
96         """Deletes an attribute from an object"""
97         m = ldb.Message()
98         m.dn = ldb.Dn(self.test_ldb_dc, dn)
99         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
100         self.test_ldb_dc.modify(m)
101
102     def start_new_repl_cycle(self):
103         """Resets enough state info to start a new replication cycle"""
104         # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
105         # whether a parent/target is unknown and needs GET_ANC/GET_TGT to
106         # resolve
107         self.rxd_links = []
108
109         self.used_get_tgt = False
110         self.used_get_anc = False
111         # mostly preserve self.last_ctr, so that we use the last HWM
112         if self.last_ctr is not None:
113             self.last_ctr.more_data = True
114
115     def create_object_range(self, start, end, prefix="",
116                             children=None, parent_list=None):
117         """
118         Creates a block of objects. Object names are numbered sequentially,
119         using the optional prefix supplied. If the children parameter is
120         supplied it will create a parent-child hierarchy and return the
121         top-level parents separately.
122         """
123         dn_list = []
124
125         # Use dummy/empty lists if we're not creating a parent/child hierarchy
126         if children is None:
127             children = []
128
129         if parent_list is None:
130             parent_list = []
131
132         # Create the parents first, then the children.
133         # This makes it easier to see in debug when GET_ANC takes effect
134         # because the parent/children become interleaved (by default,
135         # this approach means the objects are organized into blocks of
136         # parents and blocks of children together)
137         for x in range(start, end):
138             ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
139             self.add_object(ou)
140             dn_list.append(ou)
141
142             # keep track of the top-level parents (if needed)
143             parent_list.append(ou)
144
145         # create the block of children (if needed)
146         for x in range(start, end):
147             for child in children:
148                 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
149                 self.add_object(ou)
150                 dn_list.append(ou)
151
152         return dn_list
153
154     def assert_expected_data(self, expected_list):
155         """
156         Asserts that we received all the DNs that we expected and
157         none are missing.
158         """
159         received_list = self.rxd_dn_list
160
161         # Note that with GET_ANC Windows can end up sending the same parent
162         # object multiple times, so this might be noteworthy but doesn't
163         # warrant failing the test
164         num_received = len(received_list)
165         num_expected = len(expected_list)
166         if num_received != num_expected:
167             print("Note: received %d objects but expected %d" % (num_received,
168                                                                  num_expected))
169
170         # Check that we received every object that we were expecting
171         for dn in expected_list:
172             self.assertTrue(dn in received_list,
173                             "DN '%s' missing from replication." % dn)
174
175     def test_repl_integrity(self):
176         """
177         Modify the objects being replicated while the replication is still
178         in progress and check that no object loss occurs.
179         """
180
181         # The server behaviour differs between samba and Windows. Samba returns
182         # the objects in the original order (up to the pre-modify HWM). Windows
183         # incorporates the modified objects and returns them in the new order
184         # (i.e. modified objects last), up to the post-modify HWM. The
185         # Microsoft docs state the Windows behaviour is optional.
186
187         # Create a range of objects to replicate.
188         expected_dn_list = self.create_object_range(0, 400)
189         (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
190
191         # We ask for the first page of 100 objects.
192         # For this test, we don't care what order we receive the objects in,
193         # so long as by the end we've received everything
194         self.repl_get_next()
195
196         # Modify some of the second page of objects. This should bump the
197         # highwatermark
198         for x in range(100, 200):
199             self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
200
201         (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
202         self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
203
204         # Get the remaining blocks of data
205         while not self.replication_complete():
206             self.repl_get_next()
207
208         # Check we still receive all the objects we're expecting
209         self.assert_expected_data(expected_dn_list)
210
211     def is_parent_known(self, dn, known_dn_list):
212         """
213         Returns True if the parent of the dn specified is in known_dn_list
214         """
215
216         # we can sometimes get system objects like the RID Manager returned.
217         # Ignore anything that is not under the test OU we created
218         if self.ou not in dn:
219             return True
220
221         # Remove the child portion from the name to get the parent's DN
222         name_substrings = dn.split(",")
223         del name_substrings[0]
224
225         parent_dn = ",".join(name_substrings)
226
227         # check either this object is a parent (it's parent is the top-level
228         # test object), or its parent has been seen previously
229         return parent_dn == self.ou or parent_dn in known_dn_list
230
231     def _repl_send_request(self, get_anc=False, get_tgt=False):
232         """
233         Sends a GetNCChanges request for the next block of replication data.
234         """
235
236         # we're just trying to mimic regular client behaviour here, so just
237         # use the highwatermark in the last response we received
238         if self.last_ctr:
239             highwatermark = self.last_ctr.new_highwatermark
240             uptodateness_vector = self.last_ctr.uptodateness_vector
241         else:
242             # this is the first replication chunk
243             highwatermark = None
244             uptodateness_vector = None
245
246         # Ask for the next block of replication data
247         replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
248         more_flags = 0
249
250         if get_anc:
251             replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC
252             self.used_get_anc = True
253
254         if get_tgt:
255             more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
256             self.used_get_tgt = True
257
258         # return the response from the DC
259         return self._get_replication(replica_flags,
260                                      max_objects=self.max_objects,
261                                      highwatermark=highwatermark,
262                                      uptodateness_vector=uptodateness_vector,
263
264                                      more_flags=more_flags)
265
266     def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
267         """
268         Requests the next block of replication data. This tries to simulate
269         client behaviour - if we receive a replicated object that we don't know
270         the parent of, then re-request the block with the GET_ANC flag set.
271         If we don't know the target object for a linked attribute, then
272         re-request with GET_TGT.
273         """
274
275         # send a request to the DC and get the response
276         ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
277
278         # extract the object DNs and their GUIDs from the response
279         rxd_dn_list = self._get_ctr6_dn_list(ctr6)
280         rxd_guid_list = self._get_ctr6_object_guids(ctr6)
281
282         # we'll add new objects as we discover them, so take a copy of the
283         # ones we already know about, so we can modify these lists safely
284         known_objects = self.rxd_dn_list[:]
285         known_guids = self.rxd_guids[:]
286
287         # check that we know the parent for every object received
288         for i in range(0, len(rxd_dn_list)):
289
290             dn = rxd_dn_list[i]
291             guid = rxd_guid_list[i]
292
293             if self.is_parent_known(dn, known_objects):
294
295                 # the new DN is now known so add it to the list.
296                 # It may be the parent of another child in this block
297                 known_objects.append(dn)
298                 known_guids.append(guid)
299             else:
300                 # If we've already set the GET_ANC flag then it should mean
301                 # we receive the parents before the child
302                 self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
303
304                 print("Unknown parent for %s - try GET_ANC" % dn)
305
306                 # try the same thing again with the GET_ANC flag set this time
307                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
308                                           assert_links=assert_links)
309
310         # check we know about references to any objects in the linked attrs
311         received_links = self._get_ctr6_links(ctr6)
312
313         # This is so that older versions of Samba fail - we want the links to
314         # be sent roughly with the objects, rather than getting all links at
315         # the end
316         if assert_links:
317             self.assertTrue(len(received_links) > 0,
318                             "Links were expected in the GetNCChanges response")
319
320         for link in received_links:
321
322             # skip any links that aren't part of the test
323             if self.ou not in link.targetDN:
324                 continue
325
326             # check the source object is known (Windows can actually send links
327             # where we don't know the source object yet). Samba shouldn't ever
328             # hit this case because it gets the links based on the source
329             if link.identifier not in known_guids:
330
331                 # If we've already set the GET_ANC flag then it should mean
332                 # this case doesn't happen
333                 self.assertFalse(get_anc, "Unknown source object for GUID %s"
334                                  % link.identifier)
335
336                 print("Unknown source GUID %s - try GET_ANC" % link.identifier)
337
338                 # try the same thing again with the GET_ANC flag set this time
339                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
340                                           assert_links=assert_links)
341
342             # check we know the target object
343             if link.targetGUID not in known_guids:
344
345                 # If we've already set the GET_TGT flag then we should have
346                 # already received any objects we need to know about
347                 self.assertFalse(get_tgt, "Unknown linked target for object %s"
348                                  % link.targetDN)
349
350                 print("Unknown target for %s - try GET_TGT" % link.targetDN)
351
352                 # try the same thing again with the GET_TGT flag set this time
353                 return self.repl_get_next(get_anc=get_anc, get_tgt=True,
354                                           assert_links=assert_links)
355
356         # store the last successful result so we know what HWM to request next
357         self.last_ctr = ctr6
358
359         # store the objects, GUIDs, and links we received
360         self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
361         self.rxd_links += self._get_ctr6_links(ctr6)
362         self.rxd_guids += self._get_ctr6_object_guids(ctr6)
363
364         return ctr6
365
366     def replication_complete(self):
367         """Returns True if the current/last replication cycle is complete"""
368
369         if self.last_ctr is None or self.last_ctr.more_data:
370             return False
371         else:
372             return True
373
374     def test_repl_integrity_get_anc(self):
375         """
376         Modify the parent objects being replicated while the replication is
377         still in progress (using GET_ANC) and check that no object loss occurs.
378         """
379
380         # Note that GET_ANC behaviour varies between Windows and Samba.
381         # On Samba GET_ANC results in the replication restarting from the very
382         # beginning. After that, Samba remembers GET_ANC and also sends the
383         # parents in subsequent requests (regardless of whether GET_ANC is
384         # specified in the later request).
385         # Windows only sends the parents if GET_ANC was specified in the last
386         # request. It will also resend a parent, even if it's already sent the
387         # parent in a previous response (whereas Samba doesn't).
388
389         # Create a small block of 50 parents, each with 2 children (A and B)
390         # This is so that we receive some children in the first block, so we
391         # can resend with GET_ANC before we learn too many parents
392         parent_dn_list = []
393         expected_dn_list = self.create_object_range(0, 50, prefix="parent",
394                                                     children=("A", "B"),
395                                                     parent_list=parent_dn_list)
396
397         # create the remaining parents and children
398         expected_dn_list += self.create_object_range(50, 150, prefix="parent",
399                                                      children=("A", "B"),
400                                                      parent_list=parent_dn_list)
401
402         # We've now got objects in the following order:
403         # [50 parents][100 children][100 parents][200 children]
404
405         # Modify the first parent so that it's now ordered last by USN
406         # This means we set the GET_ANC flag pretty much straight away
407         # because we receive the first child before the first parent
408         self.modify_object(parent_dn_list[0], "displayName", "OU0")
409
410         # modify a later block of parents so they also get reordered
411         for x in range(50, 100):
412             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
413
414         # Get the first block of objects - this should resend the request with
415         # GET_ANC set because we won't know about the first child's parent.
416         # On samba GET_ANC essentially starts the sync from scratch again, so
417         # we get this over with early before we learn too many parents
418         self.repl_get_next()
419
420         # modify the last chunk of parents. They should now have a USN higher
421         # than the highwater-mark for the replication cycle
422         for x in range(100, 150):
423             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
424
425         # Get the remaining blocks of data - this will resend the request with
426         # GET_ANC if it encounters an object it doesn't have the parent for.
427         while not self.replication_complete():
428             self.repl_get_next()
429
430         # The way the test objects have been created should force
431         # self.repl_get_next() to use the GET_ANC flag. If this doesn't
432         # actually happen, then the test isn't doing its job properly
433         self.assertTrue(self.used_get_anc,
434                         "Test didn't use the GET_ANC flag as expected")
435
436         # Check we get all the objects we're expecting
437         self.assert_expected_data(expected_dn_list)
438
439     def assert_expected_links(self, objects_with_links, link_attr="managedBy",
440                               num_expected=None):
441         """
442         Asserts that a GetNCChanges response contains any expected links
443         for the objects it contains.
444         """
445         received_links = self.rxd_links
446
447         if num_expected is None:
448             num_expected = len(objects_with_links)
449
450         self.assertTrue(len(received_links) == num_expected,
451                         "Received %d links but expected %d"
452                         % (len(received_links), num_expected))
453
454         for dn in objects_with_links:
455             self.assert_object_has_link(dn, link_attr, received_links)
456
457     def assert_object_has_link(self, dn, link_attr, received_links):
458         """
459         Queries the object in the DB and asserts there is a link in the
460         GetNCChanges response that matches.
461         """
462
463         # Look up the link attribute in the DB
464         # The extended_dn option will dump the GUID info for the link
465         # attribute (as a hex blob)
466         res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn),
467                                       attrs=[link_attr],
468                                       controls=['extended_dn:1:0'],
469                                       scope=ldb.SCOPE_BASE)
470
471         # We didn't find the expected link attribute in the DB for the object.
472         # Something has gone wrong somewhere...
473         self.assertTrue(link_attr in res[0],
474                         "%s in DB doesn't have attribute %s" % (dn, link_attr))
475
476         # find the received link in the list and assert that the target and
477         # source GUIDs match what's in the DB
478         for val in [str(val) for val in res[0][link_attr]]:
479             # Work out the expected source and target GUIDs for the DB link
480             target_dn = ldb.Dn(self.test_ldb_dc, val)
481             targetGUID_blob = target_dn.get_extended_component("GUID")
482             sourceGUID_blob = res[0].dn.get_extended_component("GUID")
483
484             found = False
485
486             for link in received_links:
487                 if link.selfGUID_blob == sourceGUID_blob and \
488                    link.targetGUID_blob == targetGUID_blob:
489
490                     found = True
491
492                     if self._debug:
493                         print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
494                     break
495
496             self.assertTrue(found,
497                             "Did not receive expected link for DN %s" % dn)
498
499     def test_repl_get_tgt(self):
500         """
501         Creates a scenario where we should receive the linked attribute before
502         we know about the target object, and therefore need to use GET_TGT.
503         Note: Samba currently avoids this problem by sending all its links last
504         """
505
506         # create the test objects
507         reportees = self.create_object_range(0, 100, prefix="reportee")
508         managers = self.create_object_range(0, 100, prefix="manager")
509         all_objects = managers + reportees
510         expected_links = reportees
511
512         # add a link attribute to each reportee object that points to the
513         # corresponding manager object as the target
514         for i in range(0, 100):
515             self.modify_object(reportees[i], "managedBy", managers[i])
516
517         # touch the managers (the link-target objects) again to make sure the
518         # reportees (link source objects) get returned first by the replication
519         for i in range(0, 100):
520             self.modify_object(managers[i], "displayName", "OU%d" % i)
521
522         links_expected = True
523
524         # Get all the replication data - this code should resend the requests
525         # with GET_TGT
526         while not self.replication_complete():
527
528             # get the next block of replication data (this sets GET_TGT
529             # if needed)
530             self.repl_get_next(assert_links=links_expected)
531             links_expected = len(self.rxd_links) < len(expected_links)
532
533         # The way the test objects have been created should force
534         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
535         # actually happen, then the test isn't doing its job properly
536         self.assertTrue(self.used_get_tgt,
537                         "Test didn't use the GET_TGT flag as expected")
538
539         # Check we get all the objects we're expecting
540         self.assert_expected_data(all_objects)
541
542         # Check we received links for all the reportees
543         self.assert_expected_links(expected_links)
544
545     def test_repl_get_tgt_chain(self):
546         """
547         Tests the behaviour of GET_TGT with a more complicated scenario.
548         Here we create a chain of objects linked together, so if we follow
549         the link target, then we'd traverse ~200 objects each time.
550         """
551
552         # create the test objects
553         objectsA = self.create_object_range(0, 100, prefix="AAA")
554         objectsB = self.create_object_range(0, 100, prefix="BBB")
555         objectsC = self.create_object_range(0, 100, prefix="CCC")
556
557         # create a complex set of object links:
558         #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
559         # Basically each object-A should link to a circular chain of 200 B/C
560         # objects. We create the links in separate chunks here, as it makes it
561         # clearer what happens with the USN (links on Windows have their own
562         # USN, so this approach means the A->B/B->C links aren't interleaved)
563         for i in range(0, 100):
564             self.modify_object(objectsA[i], "managedBy", objectsB[i])
565
566         for i in range(0, 100):
567             self.modify_object(objectsB[i], "managedBy",
568                                objectsC[(i + 1) % 100])
569
570         for i in range(0, 100):
571             self.modify_object(objectsC[i], "managedBy",
572                                objectsB[(i + 1) % 100])
573
574         all_objects = objectsA + objectsB + objectsC
575         expected_links = all_objects
576
577         # the default order the objects now get returned in should be:
578         # [A0-A99][B0-B99][C0-C99]
579
580         links_expected = True
581
582         # Get all the replication data - this code should resend the requests
583         # with GET_TGT
584         while not self.replication_complete():
585
586             # get the next block of replication data (this sets GET_TGT
587             # if needed)
588             self.repl_get_next(assert_links=links_expected)
589             links_expected = len(self.rxd_links) < len(expected_links)
590
591         # The way the test objects have been created should force
592         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
593         # actually happen, then the test isn't doing its job properly
594         self.assertTrue(self.used_get_tgt,
595                         "Test didn't use the GET_TGT flag as expected")
596
597         # Check we get all the objects we're expecting
598         self.assert_expected_data(all_objects)
599
600         # Check we received links for all the reportees
601         self.assert_expected_links(expected_links)
602
603     def test_repl_integrity_link_attr(self):
604         """
605         Tests adding links to new objects while a replication is in progress.
606         """
607
608         # create some source objects for the linked attributes, sandwiched
609         # between 2 blocks of filler objects
610         filler = self.create_object_range(0, 100, prefix="filler")
611         reportees = self.create_object_range(0, 100, prefix="reportee")
612         filler += self.create_object_range(100, 200, prefix="filler")
613
614         # Start the replication and get the first block of filler objects
615         # (We're being mean here and setting the GET_TGT flag right from the
616         # start. On earlier Samba versions, if the client encountered an
617         # unknown target object and retried with GET_TGT, it would restart the
618         # replication cycle from scratch, which avoids the problem).
619         self.repl_get_next(get_tgt=True)
620
621         # create the target objects and add the links. These objects should be
622         # outside the scope of the Samba replication cycle, but the links
623         # should still get sent with the source object
624         managers = self.create_object_range(0, 100, prefix="manager")
625
626         for i in range(0, 100):
627             self.modify_object(reportees[i], "managedBy", managers[i])
628
629         expected_objects = managers + reportees + filler
630         expected_links = reportees
631
632         # complete the replication
633         while not self.replication_complete():
634             self.repl_get_next(get_tgt=True)
635
636         # If we didn't receive the most recently created objects in the last
637         # replication cycle, then kick off another replication to get them
638         if len(self.rxd_dn_list) < len(expected_objects):
639             self.repl_get_next()
640
641             while not self.replication_complete():
642                 self.repl_get_next()
643
644         # Check we get all the objects we're expecting
645         self.assert_expected_data(expected_objects)
646
647         # Check we received links for all the parents
648         self.assert_expected_links(expected_links)
649
650     def test_repl_get_anc_link_attr(self):
651         """
652         A basic GET_ANC test where the parents have linked attributes
653         """
654
655         # Create a block of 100 parents and 100 children
656         parent_dn_list = []
657         expected_dn_list = self.create_object_range(0, 100, prefix="parent",
658                                                     children=("A"),
659                                                     parent_list=parent_dn_list)
660
661         # Add links from the parents to the children
662         for x in range(0, 100):
663             self.modify_object(parent_dn_list[x], "managedBy",
664                                expected_dn_list[x + 100])
665
666         # add some filler objects at the end. This allows us to easily see
667         # which chunk the links get sent in
668         expected_dn_list += self.create_object_range(0, 100, prefix="filler")
669
670         # We've now got objects in the following order:
671         # [100 x children][100 x parents][100 x filler]
672
673         # Get the replication data - because the block of children come first,
674         # this should retry the request with GET_ANC
675         while not self.replication_complete():
676             self.repl_get_next()
677
678         self.assertTrue(self.used_get_anc,
679                         "Test didn't use the GET_ANC flag as expected")
680
681         # Check we get all the objects we're expecting
682         self.assert_expected_data(expected_dn_list)
683
684         # Check we received links for all the parents
685         self.assert_expected_links(parent_dn_list)
686
687     def test_repl_get_tgt_and_anc(self):
688         """
689         Check we can resolve an unknown ancestor when fetching the link target,
690         i.e. tests using GET_TGT and GET_ANC in combination
691         """
692
693         # Create some parent/child objects (the child will be the link target)
694         parents = []
695         all_objects = self.create_object_range(0, 100, prefix="parent",
696                                                children=["la_tgt"],
697                                                parent_list=parents)
698
699         children = [item for item in all_objects if item not in parents]
700
701         # create the link source objects and link them to the child/target
702         la_sources = self.create_object_range(0, 100, prefix="la_src")
703         all_objects += la_sources
704
705         for i in range(0, 100):
706             self.modify_object(la_sources[i], "managedBy", children[i])
707
708         expected_links = la_sources
709
710         # modify the children/targets so they come after the link source
711         for x in range(0, 100):
712             self.modify_object(children[x], "displayName", "OU%d" % x)
713
714         # modify the parents, so they now come last in the replication
715         for x in range(0, 100):
716             self.modify_object(parents[x], "displayName", "OU%d" % x)
717
718         # We've now got objects in the following order:
719         # [100 la_source][100 la_target][100 parents (of la_target)]
720
721         links_expected = True
722
723         # Get all the replication data - this code should resend the requests
724         # with GET_TGT and GET_ANC
725         while not self.replication_complete():
726
727             # get the next block of replication data (this sets
728             # GET_TGT/GET_ANC)
729             self.repl_get_next(assert_links=links_expected)
730             links_expected = len(self.rxd_links) < len(expected_links)
731
732         # The way the test objects have been created should force
733         # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
734         # doesn't actually happen, then the test isn't doing its job properly
735         self.assertTrue(self.used_get_tgt,
736                         "Test didn't use the GET_TGT flag as expected")
737         self.assertTrue(self.used_get_anc,
738                         "Test didn't use the GET_ANC flag as expected")
739
740         # Check we get all the objects we're expecting
741         self.assert_expected_data(all_objects)
742
743         # Check we received links for all the link sources
744         self.assert_expected_links(expected_links)
745
746         # Second part of test. Add some extra objects and kick off another
747         # replication. The test code will use the HWM from the last replication
748         # so we'll only receive the objects we modify below
749         self.start_new_repl_cycle()
750
751         # add an extra level of grandchildren that hang off a child
752         # that got created last time
753         new_parent = "OU=test_new_parent,%s" % children[0]
754         self.add_object(new_parent)
755         new_children = []
756
757         for x in range(0, 50):
758             dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
759             self.add_object(dn)
760             new_children.append(dn)
761
762         # replace half of the links to point to the new children
763         for x in range(0, 50):
764             self.delete_attribute(la_sources[x], "managedBy", children[x])
765             self.modify_object(la_sources[x], "managedBy", new_children[x])
766
767         # add some filler objects to fill up the 1st chunk
768         filler = self.create_object_range(0, 100, prefix="filler")
769
770         # modify the new children/targets so they come after the link source
771         for x in range(0, 50):
772             self.modify_object(new_children[x], "displayName", "OU-%d" % x)
773
774         # modify the parent, so it now comes last in the replication
775         self.modify_object(new_parent, "displayName", "OU%d" % x)
776
777         # We should now get the modified objects in the following order:
778         # [50 links (x 2)][100 filler][50 new children][new parent]
779         # Note that the link sources aren't actually sent (their new linked
780         # attributes are sent, but apart from that, nothing has changed)
781         all_objects = filler + new_children + [new_parent]
782         expected_links = la_sources[:50]
783
784         links_expected = True
785
786         while not self.replication_complete():
787             self.repl_get_next(assert_links=links_expected)
788             links_expected = len(self.rxd_links) < len(expected_links)
789
790         self.assertTrue(self.used_get_tgt,
791                         "Test didn't use the GET_TGT flag as expected")
792         self.assertTrue(self.used_get_anc,
793                         "Test didn't use the GET_ANC flag as expected")
794
795         # Check we get all the objects we're expecting
796         self.assert_expected_data(all_objects)
797
798         # Check we received links (50 deleted links and 50 new)
799         self.assert_expected_links(expected_links, num_expected=100)
800
801     def _repl_integrity_obj_deletion(self, delete_link_source=True):
802         """
803         Tests deleting link objects while a replication is in progress.
804         """
805
806         # create some objects and link them together, with some filler
807         # object in between the link sources
808         la_sources = self.create_object_range(0, 100, prefix="la_source")
809         la_targets = self.create_object_range(0, 100, prefix="la_targets")
810
811         for i in range(0, 50):
812             self.modify_object(la_sources[i], "managedBy", la_targets[i])
813
814         filler = self.create_object_range(0, 100, prefix="filler")
815
816         for i in range(50, 100):
817             self.modify_object(la_sources[i], "managedBy", la_targets[i])
818
819         # touch the targets so that the sources get replicated first
820         for i in range(0, 100):
821             self.modify_object(la_targets[i], "displayName", "OU%d" % i)
822
823         # objects should now be in the following USN order:
824         # [50 la_source][100 filler][50 la_source][100 la_target]
825
826         # Get the first block containing 50 link sources
827         self.repl_get_next()
828
829         # delete either the link targets or link source objects
830         if delete_link_source:
831             objects_to_delete = la_sources
832             # in GET_TGT testenvs we only receive the first 50 source objects
833             expected_objects = la_sources[:50] + la_targets + filler
834         else:
835             objects_to_delete = la_targets
836             expected_objects = la_sources + filler
837
838         for obj in objects_to_delete:
839             self.ldb_dc2.delete(obj)
840
841         # complete the replication
842         while not self.replication_complete():
843             self.repl_get_next()
844
845         # Check we get all the objects we're expecting
846         self.assert_expected_data(expected_objects)
847
848         # we can't use assert_expected_links() here because it tries to check
849         # against the deleted objects on the DC. (Although we receive some
850         # links from the first block processed, the Samba client should end up
851         # deleting these, as the source/target object involved is deleted)
852         self.assertTrue(len(self.rxd_links) == 50,
853                         "Expected 50 links, not %d" % len(self.rxd_links))
854
855     def test_repl_integrity_src_obj_deletion(self):
856         self._repl_integrity_obj_deletion(delete_link_source=True)
857
858     def test_repl_integrity_tgt_obj_deletion(self):
859         self._repl_integrity_obj_deletion(delete_link_source=False)
860
861     def restore_deleted_object(self, guid, new_dn):
862         """Re-animates a deleted object"""
863
864         guid_str = self._GUID_string(guid)
865         res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str,
866                                       attrs=["isDeleted"],
867                                       controls=['show_deleted:1'],
868                                       scope=ldb.SCOPE_BASE)
869         if len(res) != 1:
870             return
871
872         msg = ldb.Message()
873         msg.dn = res[0].dn
874         msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
875                                               "isDeleted")
876         msg["distinguishedName"] = ldb.MessageElement([new_dn],
877                                                       ldb.FLAG_MOD_REPLACE,
878                                                       "distinguishedName")
879         self.test_ldb_dc.modify(msg, ["show_deleted:1"])
880
881     def sync_DCs(self, nc_dn=None):
882         # make sure DC1 has all the changes we've made to DC2
883         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2,
884                                 nc_dn=nc_dn)
885
886     def get_object_guid(self, dn):
887         res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"],
888                                       scope=ldb.SCOPE_BASE)
889         return res[0]['objectGUID'][0]
890
891     def set_dc_connection(self, conn):
892         """
893         Switches over the connection state info that the underlying drs_base
894         class uses so that we replicate with a different DC.
895         """
896         self.default_hwm = conn.default_hwm
897         self.default_utdv = conn.default_utdv
898         self.drs = conn.drs
899         self.drs_handle = conn.drs_handle
900         self.set_test_ldb_dc(conn.ldb_dc)
901
902     def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
903                                              expected_links):
904         """
905         Replicates against both the primary and secondary DCs in the testenv
906         and checks that both return the expected results.
907         """
908         print("Checking replication against primary test DC...")
909
910         # get the replication data from the test DC first
911         while not self.replication_complete():
912             self.repl_get_next()
913
914         # Check we get all the objects and links we're expecting
915         self.assert_expected_data(all_objects)
916         self.assert_expected_links(expected_links)
917
918         # switch over the DC state info so we now talk to the peer DC
919         self.set_dc_connection(peer_conn)
920         self.init_test_state()
921
922         print("Checking replication against secondary test DC...")
923
924         # check that we get the same information from the 2nd DC
925         while not self.replication_complete():
926             self.repl_get_next()
927
928         self.assert_expected_data(all_objects)
929         self.assert_expected_links(expected_links)
930
931         # switch back to using the default connection
932         self.set_dc_connection(self.default_conn)
933
934     def test_repl_integrity_obj_reanimation(self):
935         """
936         Checks receiving links for a re-animated object doesn't lose links.
937         We test this against the peer DC to make sure it doesn't drop links.
938         """
939
940         # This test is a little different in that we're particularly interested
941         # in exercising the replmd client code on the second DC.
942         # First, make sure the peer DC has the base OU, then connect to it (so
943         # we store its initial HWM)
944         self.sync_DCs()
945         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
946
947         # create the link source/target objects
948         la_sources = self.create_object_range(0, 100, prefix="la_src")
949         la_targets = self.create_object_range(0, 100, prefix="la_tgt")
950
951         # store the target object's GUIDs (we need to know these to
952         # reanimate them)
953         target_guids = []
954
955         for dn in la_targets:
956             target_guids.append(self.get_object_guid(dn))
957
958         # delete the link target
959         for x in range(0, 100):
960             self.ldb_dc2.delete(la_targets[x])
961
962         # sync the DCs, then disable replication. We want the peer DC to get
963         # all the following changes in a single replication cycle
964         self.sync_DCs()
965         self._disable_all_repl(self.dnsname_dc2)
966
967         # restore the target objects for the linked attributes again
968         for x in range(0, 100):
969             self.restore_deleted_object(target_guids[x], la_targets[x])
970
971         # add the links
972         for x in range(0, 100):
973             self.modify_object(la_sources[x], "managedBy", la_targets[x])
974
975         # create some additional filler objects
976         filler = self.create_object_range(0, 100, prefix="filler")
977
978         # modify the targets so they now come last
979         for x in range(0, 100):
980             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
981
982         # the objects should now be sent in the following order:
983         # [la sources + links][filler][la targets]
984         all_objects = la_sources + la_targets + filler
985         expected_links = la_sources
986
987         # Enable replication again make sure the 2 DCs are back in sync
988         self._enable_all_repl(self.dnsname_dc2)
989         self.sync_DCs()
990
991         # Get the replication data from each DC in turn.
992         # Check that both give us all the objects and links we're expecting,
993         # i.e. no links were lost
994         self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
995                                                   expected_links)
996
997     def test_repl_integrity_cross_partition_links(self):
998         """
999         Checks that a cross-partition link to an unknown target object does
1000         not result in missing links.
1001         """
1002
1003         # check the peer DC is up-to-date, then connect (storing its HWM)
1004         self.sync_DCs()
1005         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
1006
1007         # stop replication so the peer gets the following objects in one go
1008         self._disable_all_repl(self.dnsname_dc2)
1009
1010         # create a link source object in the main NC
1011         la_source = "OU=cross_nc_src,%s" % self.ou
1012         self.add_object(la_source)
1013
1014         # create the link target (a server object) in the config NC
1015         sites_dn = "CN=Sites,%s" % self.config_dn
1016         servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn
1017         rand = random.randint(1, 10000000)
1018         la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn)
1019         self.add_object(la_target, objectclass="server")
1020
1021         # add a cross-partition link between the two
1022         self.modify_object(la_source, "managedBy", la_target)
1023
1024         # First, sync to the peer the NC containing the link source object
1025         self.sync_DCs()
1026
1027         # Now, before the peer has received the partition containing the target
1028         # object, try replicating from the peer. It will only know about half
1029         # of the link at this point, but it should be a valid scenario
1030         self.set_dc_connection(peer_conn)
1031
1032         while not self.replication_complete():
1033             # pretend we've received other link targets out of order and that's
1034             # forced us to use GET_TGT. This checks the peer doesn't fail
1035             # trying to fetch a cross-partition target object that doesn't
1036             # exist
1037             self.repl_get_next(get_tgt=True)
1038
1039         self.set_dc_connection(self.default_conn)
1040         self.init_test_state()
1041
1042         # Now sync across the partition containing the link target object
1043         self.sync_DCs(nc_dn=self.config_dn)
1044         self._enable_all_repl(self.dnsname_dc2)
1045
1046         # Get the replication data from each DC in turn.
1047         # Check that both return the cross-partition link (note we're not
1048         # checking the config domain NC here for simplicity)
1049         self.assert_DCs_replication_is_consistent(peer_conn,
1050                                                   all_objects=[la_source],
1051                                                   expected_links=[la_source])
1052
1053         # the cross-partition linked attribute has a missing backlink. Check
1054         # that we can still delete it successfully
1055         self.delete_attribute(la_source, "managedBy", la_target)
1056         self.sync_DCs()
1057
1058         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1059                                       attrs=["managedBy"],
1060                                       controls=['extended_dn:1:0'],
1061                                       scope=ldb.SCOPE_BASE)
1062         self.assertFalse("managedBy" in res[0],
1063                          "%s in DB still has managedBy attribute" % la_source)
1064         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1065                                       attrs=["managedBy"],
1066                                       controls=['extended_dn:1:0'],
1067                                       scope=ldb.SCOPE_BASE)
1068         self.assertFalse("managedBy" in res[0],
1069                          "%s in DB still has managedBy attribute" % la_source)
1070
1071         # Check receiving a cross-partition link to a deleted target.
1072         # Delete the target and make sure the deletion is sync'd between DCs
1073         target_guid = self.get_object_guid(la_target)
1074         self.test_ldb_dc.delete(la_target)
1075         self.sync_DCs(nc_dn=self.config_dn)
1076         self._disable_all_repl(self.dnsname_dc2)
1077
1078         # re-animate the target
1079         self.restore_deleted_object(target_guid, la_target)
1080         self.modify_object(la_source, "managedBy", la_target)
1081
1082         # now sync the link - because the target is in another partition, the
1083         # peer DC receives a link for a deleted target, which it should accept
1084         self.sync_DCs()
1085         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1086                                       attrs=["managedBy"],
1087                                       controls=['extended_dn:1:0'],
1088                                       scope=ldb.SCOPE_BASE)
1089         self.assertTrue("managedBy" in res[0],
1090                         "%s in DB missing managedBy attribute" % la_source)
1091
1092         # cleanup the server object we created in the Configuration partition
1093         self.test_ldb_dc.delete(la_target)
1094         self._enable_all_repl(self.dnsname_dc2)
1095
1096     def test_repl_get_tgt_multivalued_links(self):
1097         """Tests replication with multi-valued link attributes."""
1098
1099         # create the target/source objects and link them together
1100         la_targets = self.create_object_range(0, 500, prefix="la_tgt")
1101         la_source = "CN=la_src,%s" % self.ou
1102         self.add_object(la_source, objectclass="msExchConfigurationContainer")
1103
1104         for tgt in la_targets:
1105             self.modify_object(la_source, "addressBookRoots2", tgt)
1106
1107         filler = self.create_object_range(0, 100, prefix="filler")
1108
1109         # We should receive the objects/links in the following order:
1110         # [500 targets + 1 source][500 links][100 filler]
1111         expected_objects = la_targets + [la_source] + filler
1112         link_only_chunk = False
1113
1114         # First do the replication without needing GET_TGT
1115         while not self.replication_complete():
1116             ctr6 = self.repl_get_next()
1117
1118             if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
1119                 link_only_chunk = True
1120
1121         # we should receive one chunk that contains only links
1122         self.assertTrue(link_only_chunk,
1123                         "Expected to receive a chunk containing only links")
1124
1125         # check we received all the expected objects/links
1126         self.assert_expected_data(expected_objects)
1127         self.assert_expected_links([la_source], link_attr="addressBookRoots2",
1128                                    num_expected=500)
1129
1130         # Do the replication again, forcing the use of GET_TGT this time
1131         self.init_test_state()
1132
1133         for x in range(0, 500):
1134             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
1135
1136         # The objects/links should get sent in the following order:
1137         # [1 source][500 targets][500 links][100 filler]
1138
1139         while not self.replication_complete():
1140             ctr6 = self.repl_get_next()
1141
1142         self.assertTrue(self.used_get_tgt,
1143                         "Test didn't use the GET_TGT flag as expected")
1144
1145         # check we received all the expected objects/links
1146         self.assert_expected_data(expected_objects)
1147         self.assert_expected_links([la_source], link_attr="addressBookRoots2",
1148                                    num_expected=500)
1149
1150
1151 class DcConnection:
1152     """Helper class to track a connection to another DC"""
1153
1154     def __init__(self, drs_base, ldb_dc, dnsname_dc):
1155         self.ldb_dc = ldb_dc
1156         (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1157         (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1158         self.default_utdv = utdv