s4-tests: Tests for Validated-SPN implementation.
[kai/samba.git] / source4 / dsdb / tests / python / acl.py
index 12f653b2f220290edaeea102d23bb1389f18e2fd..977221fe1197f63b6c326ca3fc91b4c5495b6736 100755 (executable)
@@ -21,10 +21,10 @@ from ldb import ERR_OPERATIONS_ERROR
 from ldb import Message, MessageElement, Dn
 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
 from samba.ndr import ndr_pack, ndr_unpack
-from samba.dcerpc import security
+from samba.dcerpc import security, drsuapi, misc, netlogon, nbt
 
 from samba.auth import system_session
-from samba import gensec, sd_utils
+from samba import gensec, sd_utils, join
 from samba.samdb import SamDB
 from samba.credentials import Credentials
 import samba.tests
@@ -47,6 +47,12 @@ if len(args) < 1:
     sys.exit(1)
 
 host = args[0]
+if not "://" in host:
+    ldaphost = "ldap://%s" % host
+else:
+    ldaphost = host
+    start = host.rindex("://")
+    host = host.lstrip(start+3)
 
 lp = sambaopts.get_loadparm()
 creds = credopts.get_credentials(lp)
@@ -87,7 +93,7 @@ class AclTests(samba.tests.TestCase):
         creds_tmp.set_workstation(creds.get_workstation())
         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
                                       | gensec.FEATURE_SEAL)
-        ldb_target = SamDB(url=host, credentials=creds_tmp, lp=lp)
+        ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
         return ldb_target
 
     # Test if we have any additional groups for users than default ones
@@ -238,7 +244,7 @@ class AclAddTests(AclTests):
 
     def test_add_anonymous(self):
         """Test add operation with anonymous user"""
-        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
         try:
             anonymous.newuser("test_add_anonymous", self.user_pass)
         except LdbError, (num, _):
@@ -574,7 +580,7 @@ Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
 
     def test_modify_anonymous(self):
         """Test add operation with anonymous user"""
-        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
         self.ldb_admin.newuser("test_anonymous", "samba123@")
         m = Message()
         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
@@ -658,7 +664,7 @@ class AclSearchTests(AclTests):
 
     def test_search_anonymous1(self):
         """Verify access of rootDSE with the correct request"""
-        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
         self.assertEquals(len(res), 1)
         #verify some of the attributes
@@ -674,7 +680,7 @@ class AclSearchTests(AclTests):
 
     def test_search_anonymous2(self):
         """Make sure we cannot access anything else"""
-        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
         try:
             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
         except LdbError, (num, _):
@@ -702,7 +708,7 @@ class AclSearchTests(AclTests):
         mod = "(A;CI;LC;;;AN)"
         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
-        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
         self.assertEquals(len(res), 1)
@@ -997,7 +1003,7 @@ class AclDeleteTests(AclTests):
 
     def test_delete_anonymous(self):
         """Test add operation with anonymous user"""
-        anonymous = SamDB(url=host, credentials=self.creds_tmp, lp=lp)
+        anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
         self.ldb_admin.newuser("test_anonymous", "samba123@")
 
         try:
@@ -1556,11 +1562,265 @@ class AclExtendedTests(AclTests):
         self.assertEqual(len(res),1)
         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
 
+class AclSPNTests(AclTests):
+
+    def setUp(self):
+        super(AclSPNTests, self).setUp()
+        self.dcname = "TESTSRV8"
+        self.rodcname = "TESTRODC8"
+        self.computername = "testcomp8"
+        self.test_user = "spn_test_user8"
+        self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
+        self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
+        self.site = "Default-First-Site-Name"
+        self.rodcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.rodcname,
+                                          targetdir=None, domain=None)
+        self.dcctx = samba.join.dc_join(server=host, creds=creds, lp=lp, site=self.site, netbios_name=self.dcname,
+                                        targetdir=None, domain=None)
+        self.ldb_admin.newuser(self.test_user, self.user_pass)
+        self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
+        self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
+        self.create_computer(self.computername, self.dcctx.dnsdomain)
+        self.create_rodc(self.rodcctx)
+        self.create_dc(self.dcctx)
+
+    def tearDown(self):
+        super(AclSPNTests, self).tearDown()
+        self.rodcctx.cleanup_old_join()
+        self.dcctx.cleanup_old_join()
+        delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
+        delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
+
+    def replace_spn(self, _ldb, dn, spn):
+        print "Setting spn %s on %s" % (spn, dn)
+        res = self.ldb_admin.search(dn, expression="(objectClass=*)",
+                                    scope=SCOPE_BASE, attrs=["servicePrincipalName"])
+        if "servicePrincipalName" in res[0].keys():
+            flag = FLAG_MOD_REPLACE
+        else:
+            flag = FLAG_MOD_ADD
+
+        msg = Message()
+        msg.dn = Dn(self.ldb_admin, dn)
+        msg["servicePrincipalName"] = MessageElement(spn, flag,
+                                                         "servicePrincipalName")
+        _ldb.modify(msg)
+
+    def create_computer(self, computername, domainname):
+        dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
+        samaccountname = computername + "$"
+        dnshostname = "%s.%s" % (computername, domainname)
+        self.ldb_admin.add({
+            "dn": dn,
+            "objectclass": "computer",
+            "sAMAccountName": samaccountname,
+            "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
+            "dNSHostName": dnshostname})
+
+    # same as for join_RODC, but do not set any SPNs
+    def create_rodc(self, ctx):
+         ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
+
+         ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
+                                  "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+                                  "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+                                  "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+                                  "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
+         ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
+
+         mysid = ctx.get_mysid()
+         admin_dn = "<SID=%s>" % mysid
+         ctx.managedby = admin_dn
+
+         ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
+                              samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
+                              samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
+
+         ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
+         ctx.secure_channel_type = misc.SEC_CHAN_RODC
+         ctx.RODC = True
+         ctx.replica_flags  =  (drsuapi.DRSUAPI_DRS_INIT_SYNC |
+                                drsuapi.DRSUAPI_DRS_PER_SYNC |
+                                drsuapi.DRSUAPI_DRS_GET_ANC |
+                                drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
+                                drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
+
+         ctx.join_add_objects()
+
+    def create_dc(self, ctx):
+        ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
+        ctx.secure_channel_type = misc.SEC_CHAN_BDC
+        ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
+                             drsuapi.DRSUAPI_DRS_INIT_SYNC |
+                             drsuapi.DRSUAPI_DRS_PER_SYNC |
+                             drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
+                             drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
+
+        ctx.join_add_objects()
+
+    def dc_spn_test(self, ctx):
+        netbiosdomain = self.dcctx.get_domain_name()
+        try:
+            self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
+
+        mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
+        self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
+                         (ctx.myname, ctx.dnsdomain, netbiosdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
+                         (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
+                         (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
+                         (ctx.myname, ctx.dnsdomain, netbiosdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
+                         (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
+                         (ctx.myname, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
+                         (ctx.myname))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
+                         (ctx.myname, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
+                         (ctx.myname, ctx.dnsdomain))
+        self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
+                         (ctx.ntds_guid, ctx.dnsdomain))
+
+        #the following spns do not match the restrictions and should fail
+        try:
+            self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
+                             (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
+                             (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
+                             (ctx.myname, ctx.dnsdomain, netbiosdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
+                             (ctx.ntds_guid, ctx.dnsdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
+    def test_computer_spn(self):
+        # with WP, any value can be set
+        netbiosdomain = self.dcctx.get_domain_name()
+        self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
+                         (self.computername, netbiosdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
+        self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain, netbiosdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
+                         (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
+                         (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain, netbiosdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
+        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
+                         (self.computername))
+        self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
+                         (self.computername, self.dcctx.dnsdomain))
+        self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
+
+        #user has neither WP nor Validated-SPN, access denied expected
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
+
+        mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
+        self.sd_utils.dacl_add_ace(self.computerdn, mod)
+        #grant Validated-SPN and check which values are accepted
+        #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
+
+        # for regular computer objects we shouldalways get constraint violation
+
+        # This does not pass against Windows, although it should according to docs
+        self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
+        self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
+                             (self.computername, self.dcctx.dnsdomain))
+
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
+                             (self.computername, self.dcctx.dnsdomain, netbiosdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
+                             (self.computername, self.dcctx.dnsdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
+                             (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
+                             (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+        try:
+            self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
+                             (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
+
+    def test_spn_rwdc(self):
+        self.dc_spn_test(self.dcctx)
+
+    def test_spn_rodc(self):
+        self.dc_spn_test(self.rodcctx)
+
+
 # Important unit running information
 
-if not "://" in host:
-    host = "ldap://%s" % host
-ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
+ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
 
 runner = SubunitTestRunner()
 rc = 0
@@ -1592,6 +1852,6 @@ ldb.set_minPwdAge(minPwdAge)
 
 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
     rc = 1
-
-
+if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
+    rc = 1
 sys.exit(rc)