tests/getdcname: Add a number of tests for GetDCNameEx
authorGarming Sam <garming@catalyst.net.nz>
Wed, 28 Mar 2018 04:16:25 +0000 (17:16 +1300)
committerGarming Sam <garming@samba.org>
Fri, 4 May 2018 04:12:10 +0000 (06:12 +0200)
This will test the winbind forwarding to deal with sites that the target
DC does not exist in.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=13365

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/getdcname.py [new file with mode: 0644]
selftest/knownfail.d/getdcname [new file with mode: 0644]
source4/selftest/tests.py

diff --git a/python/samba/tests/getdcname.py b/python/samba/tests/getdcname.py
new file mode 100644 (file)
index 0000000..9f8f993
--- /dev/null
@@ -0,0 +1,459 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""
+    Tests GetDCNameEx calls in NETLOGON
+"""
+
+from samba import auth
+from samba import WERRORError, werror
+import samba.tests
+import time
+import json
+import os
+from samba.credentials import Credentials
+from samba.dcerpc import netlogon
+from samba.tests import delete_force
+from samba.dcerpc.misc import GUID
+
+
+class GetDCNameEx(samba.tests.TestCase):
+
+    def setUp(self):
+        self.lp = samba.tests.env_loadparm()
+        self.creds = Credentials()
+
+        self.netlogon_conn = None
+        self.server = os.environ.get('SERVER')
+        self.realm = os.environ.get('REALM')
+        self.domain = os.environ.get('DOMAIN')
+        self.trust_realm = os.environ.get('TRUST_REALM')
+        self.trust_domain = os.environ.get('TRUST_DOMAIN')
+
+    def _call_get_dc_name(self, domain=None, domain_guid=None,
+                          site_name=None, ex2=False, flags=0):
+        if self.netlogon_conn is None:
+            self.netlogon_conn = netlogon.netlogon("ncalrpc:[schannel]",
+                                                   self.get_loadparm())
+
+        if ex2:
+            return self.netlogon_conn.netr_DsRGetDCNameEx2(self.server,
+                                                           None, 0,
+                                                           domain,
+                                                           domain_guid,
+                                                           site_name,
+                                                           flags)
+        else:
+            return self.netlogon_conn.netr_DsRGetDCNameEx(self.server,
+                                                          domain,
+                                                          domain_guid,
+                                                          site_name,
+                                                          flags)
+
+    def test_get_dc_ex2(self):
+        """Check the most trivial requirements of Ex2 (no domain or site)
+
+        a) The paths are prefixed with two backslashes
+        b) The returned domains conform to the format requested
+        c) The domain matches our own domain
+        """
+        response = self._call_get_dc_name(ex2=True)
+
+        self.assertTrue(response.dc_unc is not None)
+        self.assertTrue(response.dc_unc.startswith('\\\\'))
+        self.assertTrue(response.dc_address is not None)
+        self.assertTrue(response.dc_address.startswith('\\\\'))
+
+        self.assertTrue(response.domain_name.lower() ==
+                        self.realm.lower() or
+                        response.domain_name.lower() ==
+                        self.domain.lower())
+
+        response = self._call_get_dc_name(ex2=True,
+                                          flags=netlogon.DS_RETURN_DNS_NAME)
+        self.assertEqual(response.domain_name.lower(),
+                         self.realm.lower())
+
+        response = self._call_get_dc_name(ex2=True,
+                                          flags=netlogon.DS_RETURN_FLAT_NAME)
+        self.assertEqual(response.domain_name.lower(),
+                         self.domain.lower())
+
+    def test_get_dc_over_winbind_ex2(self):
+        """Check what happens to Ex2 requests after being forwarded to winbind
+
+        a) The paths must still have the same backslash prefixes
+        b) The returned domain does not match our own domain
+        c) The domain matches the format requested
+        """
+        if self.trust_realm is None:
+            return
+
+        response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                ex2=True)
+        response = self._call_get_dc_name(domain=self.realm,
+                                          ex2=True)
+
+        self.assertTrue(response_trust.dc_unc is not None)
+        self.assertTrue(response_trust.dc_unc.startswith('\\\\'))
+        self.assertTrue(response_trust.dc_address is not None)
+        self.assertTrue(response_trust.dc_address.startswith('\\\\'))
+
+        self.assertNotEqual(response_trust.dc_unc,
+                            response.dc_unc)
+        self.assertNotEqual(response_trust.dc_address,
+                            response.dc_address)
+
+        self.assertTrue(response_trust.domain_name.lower() ==
+                        self.trust_realm.lower() or
+                        response_trust.domain_name.lower() ==
+                        self.trust_domain.lower())
+
+        response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                flags=netlogon.DS_RETURN_DNS_NAME,
+                                                ex2=True)
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_realm.lower())
+
+        response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                flags=netlogon.DS_RETURN_FLAT_NAME,
+                                                ex2=True)
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_domain.lower())
+
+    def test_get_dc_over_winbind(self):
+        """Test the standard Ex version (not Ex2)
+
+        Ex calls Ex2 anyways, from now on, just test Ex.
+        """
+        if self.trust_realm is None:
+            return
+
+        response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                flags=netlogon.DS_RETURN_DNS_NAME)
+
+        self.assertTrue(response_trust.dc_unc is not None)
+        self.assertTrue(response_trust.dc_unc.startswith('\\\\'))
+        self.assertTrue(response_trust.dc_address is not None)
+        self.assertTrue(response_trust.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_realm.lower())
+
+    def test_get_dc_over_winbind_with_site(self):
+        """Test the standard Ex version (not Ex2)
+
+        We assume that there is a Default-First-Site-Name site.
+        """
+        if self.trust_realm is None:
+            return
+
+        site = 'Default-First-Site-Name'
+        response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                site_name=site,
+                                                flags=netlogon.DS_RETURN_DNS_NAME)
+
+        self.assertTrue(response_trust.dc_unc is not None)
+        self.assertTrue(response_trust.dc_unc.startswith('\\\\'))
+        self.assertTrue(response_trust.dc_address is not None)
+        self.assertTrue(response_trust.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_realm.lower())
+
+        self.assertEqual(site.lower(), response_trust.dc_site_name.lower())
+
+    def test_get_dc_over_winbind_invalid_site(self):
+        """Test the standard Ex version (not Ex2)
+
+        We assume that there is no Invalid-First-Site-Name site.
+        """
+        if self.trust_realm is None:
+            return
+
+        site = 'Invalid-First-Site-Name'
+        try:
+            response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                    site_name=site,
+                                                    flags=netlogon.DS_RETURN_DNS_NAME,
+                                                    ex2=False)
+            self.fail("Failed to give the correct error for incorrect site")
+        except WERRORError as e:
+            enum, estr = e.args
+            if enum != werror.WERR_NO_SUCH_DOMAIN:
+                self.fail("Failed to detect an invalid site name")
+
+    def test_get_dc_over_winbind_invalid_site_ex2(self):
+        """Test the Ex2 version.
+
+        We assume that there is no Invalid-First-Site-Name site.
+        """
+        if self.trust_realm is None:
+            return
+
+        site = 'Invalid-First-Site-Name'
+        try:
+            response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                    site_name=site,
+                                                    flags=netlogon.DS_RETURN_DNS_NAME,
+                                                    ex2=True)
+            self.fail("Failed to give the correct error for incorrect site")
+        except WERRORError as e:
+            enum, estr = e.args
+            if enum != werror.WERR_NO_SUCH_DOMAIN:
+                self.fail("Failed to detect an invalid site name")
+
+    def test_get_dc_over_winbind_empty_string_site(self):
+        """Test the standard Ex version (not Ex2)
+
+        We assume that there is a Default-First-Site-Name site.
+        """
+        if self.trust_realm is None:
+            return
+
+        site = ''
+        try:
+            response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                    site_name=site,
+                                                    flags=netlogon.DS_RETURN_DNS_NAME)
+        except WERRORError as e:
+            self.fail("Unable to get empty string site result: " + str(e))
+
+        self.assertTrue(response_trust.dc_unc is not None)
+        self.assertTrue(response_trust.dc_unc.startswith('\\\\'))
+        self.assertTrue(response_trust.dc_address is not None)
+        self.assertTrue(response_trust.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_realm.lower())
+
+        self.assertTrue(response_trust.dc_site_name is not None)
+        self.assertNotEqual('', response_trust.dc_site_name)
+
+    def test_get_dc_over_winbind_netbios(self):
+        """Supply a NETBIOS trust domain name."""
+        if self.trust_realm is None:
+            return
+
+        try:
+            response_trust = self._call_get_dc_name(domain=self.trust_domain,
+                                                    flags=netlogon.DS_RETURN_DNS_NAME,
+                                                    ex2=False)
+        except WERRORError as e:
+            self.fail("Failed to succeed over winbind: " + str(e))
+
+        self.assertTrue(response_trust is not None)
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_realm.lower())
+
+    def test_get_dc_over_winbind_with_site_netbios(self):
+        """Supply a NETBIOS trust domain name.
+
+        Sporadically fails because NETBIOS queries do not return site name in
+        winbind. The site check in NETLOGON will trigger and fail the request.
+
+        Currently marked in flapping...
+        """
+        if self.trust_realm is None:
+            return
+
+        site = 'Default-First-Site-Name'
+        try:
+            response_trust = self._call_get_dc_name(domain=self.trust_domain,
+                                                    site_name=site,
+                                                    flags=netlogon.DS_RETURN_DNS_NAME,
+                                                    ex2=False)
+        except WERRORError as e:
+            self.fail("Failed to succeed over winbind: " + str(e))
+
+        self.assertTrue(response_trust is not None)
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_realm.lower())
+
+        self.assertEqual(site.lower(), response_trust.dc_site_name.lower())
+
+    def test_get_dc_over_winbind_domain_guid(self):
+        """Ensure that we do not reject requests supplied with a NULL GUID"""
+
+        if self.trust_realm is None:
+            return
+
+        null_guid = GUID()
+        try:
+            response_trust = self._call_get_dc_name(domain=self.trust_realm,
+                                                    domain_guid=null_guid,
+                                                    flags=netlogon.DS_RETURN_DNS_NAME)
+        except WERRORError as e:
+            self.fail("Unable to get NULL domain GUID result: " + str(e))
+
+        self.assertTrue(response_trust.dc_unc is not None)
+        self.assertTrue(response_trust.dc_unc.startswith('\\\\'))
+        self.assertTrue(response_trust.dc_address is not None)
+        self.assertTrue(response_trust.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response_trust.domain_name.lower(),
+                         self.trust_realm.lower())
+
+    def test_get_dc_with_site(self):
+        """Test the standard Ex version (not Ex2)
+
+        We assume that there is a Default-First-Site-Name site.
+        """
+
+        site = 'Default-First-Site-Name'
+        response = self._call_get_dc_name(domain=self.realm,
+                                          site_name=site,
+                                          flags=netlogon.DS_RETURN_DNS_NAME)
+
+        self.assertTrue(response.dc_unc is not None)
+        self.assertTrue(response.dc_unc.startswith('\\\\'))
+        self.assertTrue(response.dc_address is not None)
+        self.assertTrue(response.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response.domain_name.lower(),
+                         self.realm.lower())
+
+        self.assertEqual(site.lower(), response.dc_site_name.lower())
+
+    def test_get_dc_invalid_site(self):
+        """Test the standard Ex version (not Ex2)
+
+        We assume that there is no Invalid-First-Site-Name site.
+        """
+        if self.realm is None:
+            return
+
+        site = 'Invalid-First-Site-Name'
+        try:
+            response = self._call_get_dc_name(domain=self.realm,
+                                              site_name=site,
+                                              flags=netlogon.DS_RETURN_DNS_NAME,
+                                              ex2=False)
+            self.fail("Failed to give the correct error for incorrect site")
+        except WERRORError as e:
+            enum, estr = e.args
+            if enum != werror.WERR_NO_SUCH_DOMAIN:
+                self.fail("Failed to detect an invalid site name")
+
+    def test_get_dc_invalid_site_ex2(self):
+        """Test the Ex2 version
+
+        We assume that there is no Invalid-First-Site-Name site.
+        """
+
+        site = 'Invalid-First-Site-Name'
+        try:
+            response = self._call_get_dc_name(domain=self.realm,
+                                              site_name=site,
+                                              flags=netlogon.DS_RETURN_DNS_NAME,
+                                              ex2=True)
+            self.fail("Failed to give the correct error for incorrect site")
+        except WERRORError as e:
+            enum, estr = e.args
+            if enum != werror.WERR_NO_SUCH_DOMAIN:
+                self.fail("Failed to detect an invalid site name")
+
+    def test_get_dc_empty_string_site(self):
+        """Test the standard Ex version (not Ex2)
+
+        We assume that there is a Default-First-Site-Name site.
+        """
+
+        site = ''
+        try:
+            response = self._call_get_dc_name(domain=self.realm,
+                                              site_name=site,
+                                              flags=netlogon.DS_RETURN_DNS_NAME)
+        except WERRORError as e:
+            self.fail("Unable to get empty string site result: " + str(e))
+
+        self.assertTrue(response.dc_unc is not None)
+        self.assertTrue(response.dc_unc.startswith('\\\\'))
+        self.assertTrue(response.dc_address is not None)
+        self.assertTrue(response.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response.domain_name.lower(),
+                         self.realm.lower())
+
+        self.assertTrue(response.dc_site_name is not None)
+        self.assertNotEqual('', response.dc_site_name)
+
+    def test_get_dc_netbios(self):
+        """Supply a NETBIOS domain name."""
+
+        try:
+            response = self._call_get_dc_name(domain=self.domain,
+                                              flags=netlogon.DS_RETURN_DNS_NAME,
+                                              ex2=False)
+        except WERRORError as e:
+            self.fail("Failed to succeed over winbind: " + str(e))
+
+        self.assertTrue(response is not None)
+        self.assertEqual(response.domain_name.lower(),
+                         self.realm.lower())
+
+    def test_get_dc_with_site_netbios(self):
+        """Supply a NETBIOS domain name."""
+
+        site = 'Default-First-Site-Name'
+        try:
+            response = self._call_get_dc_name(domain=self.domain,
+                                              site_name=site,
+                                              flags=netlogon.DS_RETURN_DNS_NAME,
+                                              ex2=False)
+        except WERRORError as e:
+            self.fail("Failed to succeed over winbind: " + str(e))
+
+        self.assertTrue(response is not None)
+        self.assertEqual(response.domain_name.lower(),
+                         self.realm.lower())
+
+        self.assertEqual(site.lower(), response.dc_site_name.lower())
+
+    def test_get_dc_with_domain_guid(self):
+        """Ensure that we do not reject requests supplied with a NULL GUID"""
+
+        null_guid = GUID()
+        response = self._call_get_dc_name(domain=self.realm,
+                                          domain_guid=null_guid,
+                                          flags=netlogon.DS_RETURN_DNS_NAME)
+
+        self.assertTrue(response.dc_unc is not None)
+        self.assertTrue(response.dc_unc.startswith('\\\\'))
+        self.assertTrue(response.dc_address is not None)
+        self.assertTrue(response.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response.domain_name.lower(),
+                         self.realm.lower())
+
+    def test_get_dc_with_empty_string_domain(self):
+        """Ensure that empty domain resolve to the DC domain"""
+        response = self._call_get_dc_name(domain='',
+                                          flags=netlogon.DS_RETURN_DNS_NAME)
+
+        self.assertTrue(response.dc_unc is not None)
+        self.assertTrue(response.dc_unc.startswith('\\\\'))
+        self.assertTrue(response.dc_address is not None)
+        self.assertTrue(response.dc_address.startswith('\\\\'))
+
+        self.assertEqual(response.domain_name.lower(),
+                         self.realm.lower())
+
+    # TODO Thorough tests of domain GUID
+    #
+    # The domain GUID does not seem to be authoritative, and seems to be a
+    # fallback case for renamed domains.
diff --git a/selftest/knownfail.d/getdcname b/selftest/knownfail.d/getdcname
new file mode 100644 (file)
index 0000000..1c086a3
--- /dev/null
@@ -0,0 +1,8 @@
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_empty_string_site.fl2008r2dc:local.*
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_over_winbind.fl2008r2dc:local.*
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_over_winbind_domain_guid.fl2008r2dc:local.*
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_over_winbind_empty_string_site.fl2008r2dc:local.*
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_over_winbind_ex2.fl2008r2dc:local.*
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_over_winbind_netbios.fl2008r2dc:local.*
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_over_winbind_with_site.fl2008r2dc:local.*
+^samba.tests.getdcname.samba.tests.getdcname.GetDCNameEx.test_get_dc_over_winbind_with_site_netbios.fl2008r2dc:local.*
index d1e5bc6a50955aa1f0e1e5af10da7dde95537e34..ecf2c2146fb81a9c166d1ee07913a09da27dadb9 100755 (executable)
@@ -674,6 +674,11 @@ if have_heimdal_support:
                            extra_args=['-U"$USERNAME%$PASSWORD"'],
                            environ={'CLIENT_IP': '127.0.0.11',
                                     'SOCKET_WRAPPER_DEFAULT_IFACE': 11})
+
+planoldpythontestsuite("fl2008r2dc:local",
+                       "samba.tests.getdcname",
+                       extra_args=['-U"$USERNAME%$PASSWORD"'])
+
 planoldpythontestsuite("ad_dc",
                        "samba.tests.net_join_no_spnego",
                        extra_args=['-U"$USERNAME%$PASSWORD"'])