From: Bob Campbell Date: Fri, 27 Jan 2017 02:22:27 +0000 (+1300) Subject: python/tests: improve samba-tool replicate --local test X-Git-Tag: talloc-2.1.9~163 X-Git-Url: http://git.samba.org/samba.git/?p=nivanova%2Fsamba-autobuild%2F.git;a=commitdiff_plain;h=b73c2e66f590e592b77afcc96017be00ee64fdc4 python/tests: improve samba-tool replicate --local test It now makes sure that we only replicate incremental changes. Signed-off-by: Bob Campbell Reviewed-by: Andrew Bartlett Reviewed-by: Stefan Metzmacher Autobuild-User(master): Andrew Bartlett Autobuild-Date(master): Wed Feb 15 01:21:06 CET 2017 on sn-devel-144 --- diff --git a/source4/torture/drs/python/samba_tool_drs.py b/source4/torture/drs/python/samba_tool_drs.py index ccdba1de850..477529d45c8 100644 --- a/source4/torture/drs/python/samba_tool_drs.py +++ b/source4/torture/drs/python/samba_tool_drs.py @@ -1,5 +1,6 @@ # Blackbox tests for "samba-tool drs" command # Copyright (C) Kamen Mazdrashki 2011 +# Copyright (C) Andrew Bartlett 2017 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -36,6 +37,21 @@ class SambaToolDrsTests(drs_base.DrsBaseTestCase): self.cmdline_creds = "-U%s/%s%%%s" % (creds.get_domain(), creds.get_username(), creds.get_password()) + def tearDown(self): + self._enable_inbound_repl(self.dnsname_dc1) + self._enable_inbound_repl(self.dnsname_dc2) + + try: + shutil.rmtree(os.path.join(self.tempdir, "private")) + shutil.rmtree(os.path.join(self.tempdir, "etc")) + shutil.rmtree(os.path.join(self.tempdir, "msg.lock")) + os.remove(os.path.join(self.tempdir, "names.tdb")) + shutil.rmtree(os.path.join(self.tempdir, "state")) + except Exception: + pass + + super(SambaToolDrsTests, self).tearDown() + def _get_rootDSE(self, dc, ldap_only=True): samdb = samba.tests.connect_samdb(dc, lp=self.get_loadparm(), credentials=self.get_credentials(), @@ -154,12 +170,99 @@ class SambaToolDrsTests(drs_base.DrsBaseTestCase): # Output should be like 'Replicate from to was successful.' nc_name = self._get_rootDSE(self.dc1)["defaultNamingContext"] - out = self.check_output("samba-tool drs replicate --local %s %s %s %s" % (self.dc1, - self.dc2, - nc_name, - self.cmdline_creds)) - self.assertTrue("Incremental" in out) + + def get_num_obj_links(output): + num_objs = None + num_links = None + for word in output.split(" "): + try: + int(word) + if num_objs is None: + num_objs = int(word) + elif num_links is None: + num_links = int(word) + except ValueError: + pass + + return (num_objs, num_links) + + out = self.check_output("samba-tool drs replicate --local --full-sync %s %s %s %s" + % (self.dc1, self.dc2, nc_name, self.cmdline_creds)) + self.assertTrue("was successful" in out) + self.assertTrue("Full" in out) + + (first_obj, _) = get_num_obj_links(out) + + out = self.check_output("samba-tool drs replicate --local %s %s %s %s" + % (self.dc1, self.dc2, nc_name, self.cmdline_creds)) self.assertTrue("was successful" in out) + self.assertTrue("Incremental" in out) + + (second_obj, _) = get_num_obj_links(out) + + self.assertTrue(first_obj > second_obj) + + server_rootdse = self._get_rootDSE(self.dc1) + server_nc_name = server_rootdse["defaultNamingContext"] + server_ds_name = server_rootdse["dsServiceName"] + server_ldap_service_name = str(server_rootdse["ldapServiceName"][0]) + server_realm = server_ldap_service_name.split(":")[0] + creds = self.get_credentials() + + # We have to give it a different netbiosname every time + # it runs, otherwise the collision causes strange issues + # to happen. This should be different on different environments. + netbiosname = "test" + self.dc2 + if len(netbiosname) > 15: + netbiosname = netbiosname[:15] + + out = self.check_output("samba-tool domain join %s dc --server=%s %s --targetdir=%s --option=netbiosname=%s" + % (server_realm, self.dc1, self.cmdline_creds, self.tempdir, netbiosname)) + + new_dc_config_file = "%s/etc/smb.conf" % self.tempdir + + self.check_output("samba-tool drs replicate --local %s %s %s %s -s %s" + % ("invalid", self.dc1, nc_name, + self.cmdline_creds, new_dc_config_file)) + + self._disable_inbound_repl(self.dnsname_dc1) + self._disable_inbound_repl(self.dnsname_dc2) + + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2) + + # add an object with link on dc1 + group_name = "group-repl-local-%s" % self.dc2 + user_name = "user-repl-local-%s" % self.dc2 + + self.check_output("samba-tool group add %s %s -H ldap://%s" + % (group_name, self.cmdline_creds, self.dc1)) + self.check_output("samba-tool user add %s %s --random-password -H ldap://%s" + % (user_name, self.cmdline_creds, self.dc1)) + self.check_output("samba-tool group addmembers %s %s %s -H ldap://%s" + % (group_name, user_name, self.cmdline_creds, self.dc1)) + + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1) + + # pull that change with --local into local db from dc1: should send link and some objects + out = self.check_output("samba-tool drs replicate --local %s %s %s %s -s %s" + % ("invalid", self.dc1, nc_name, + self.cmdline_creds, new_dc_config_file)) + + (obj_1, link_1) = get_num_obj_links(out) + + self.assertEqual(obj_1, 2) + self.assertEqual(link_1, 1) + + # pull that change with --local into local db from dc2: shouldn't send link or object + # as we sent an up-to-dateness vector showing that we had already synced with DC1 + out = self.check_output("samba-tool drs replicate --local %s %s %s %s -s %s" + % ("invalid", self.dc2, nc_name, + self.cmdline_creds, new_dc_config_file)) + + (obj_2, link_2) = get_num_obj_links(out) + + self.assertEqual(obj_2, 0) + self.assertEqual(link_2, 0) def test_samba_tool_replicate_machine_creds_P(self): """Tests 'samba-tool drs replicate -P' command with machine creds.""" @@ -247,12 +350,6 @@ class SambaToolDrsTests(drs_base.DrsBaseTestCase): attrs=[]) self.assertRaises(ldb.LdbError, check_dns_account_obj) - shutil.rmtree(os.path.join(self.tempdir, "private")) - shutil.rmtree(os.path.join(self.tempdir, "etc")) - shutil.rmtree(os.path.join(self.tempdir, "msg.lock")) - os.remove(os.path.join(self.tempdir, "names.tdb")) - shutil.rmtree(os.path.join(self.tempdir, "state")) - def test_samba_tool_drs_clone_dc_secrets(self): """Tests 'samba-tool drs clone-dc-database --include-secrets' command .""" server_rootdse = self._get_rootDSE(self.dc1) @@ -325,12 +422,6 @@ class SambaToolDrsTests(drs_base.DrsBaseTestCase): attrs=[]) self.assertRaises(ldb.LdbError, check_dns_account_obj) - shutil.rmtree(os.path.join(self.tempdir, "private")) - shutil.rmtree(os.path.join(self.tempdir, "etc")) - shutil.rmtree(os.path.join(self.tempdir, "msg.lock")) - os.remove(os.path.join(self.tempdir, "names.tdb")) - shutil.rmtree(os.path.join(self.tempdir, "state")) - def test_samba_tool_drs_clone_dc_secrets_without_targetdir(self): """Tests 'samba-tool drs clone-dc-database' command without --targetdir.""" server_rootdse = self._get_rootDSE(self.dc1)