53be43d18e3b62aa1c604a4ee2f385d2dd5576ed
[samba.git] / python / samba / tests / samba_tool / user_get_kerberos_ticket.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Blackbox tests for getting Kerberos tickets from Group Managed Service Account and other (local) passwords
4 #
5 # Copyright (C) Catalyst.Net Ltd. 2023
6 #
7 # Written by Rob van der Linde <rob@catalyst.net.nz>
8 #
9 # Copyright Andrew Bartlett <abartlet@samba.org> 2023
10 #
11 # This program is free software; you can redistribute it and/or modify
12 # it under the terms of the GNU General Public License as published by
13 # the Free Software Foundation; either version 3 of the License, or
14 # (at your option) any later version.
15 #
16 # This program is distributed in the hope that it will be useful,
17 # but WITHOUT ANY WARRANTY; without even the implied warranty of
18 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 # GNU General Public License for more details.
20 #
21 # You should have received a copy of the GNU General Public License
22 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 #
24
25 import os
26 import sys
27
28 sys.path.insert(0, "bin/python")
29 os.environ["PYTHONUNBUFFERED"] = "1"
30
31 from ldb import SCOPE_BASE
32
33 from samba import credentials
34 from samba.credentials import MUST_USE_KERBEROS
35 from samba.dcerpc import security
36 from samba.domain.models import User
37 from samba.dsdb import UF_NORMAL_ACCOUNT, UF_WORKSTATION_TRUST_ACCOUNT
38 from samba.ndr import ndr_pack, ndr_unpack
39 from samba.tests import (BlackboxProcessError, BlackboxTestCase, connect_samdb,
40                          delete_force)
41
42 # If not specified, this is None, meaning local sam.ldb
43 PW_READ_URL = os.environ.get("PW_READ_URL")
44
45 # We still need to connect to a remote server to check we got the ticket
46 SERVER = os.environ.get("SERVER")
47
48 PW_CHECK_URL = f"ldap://{SERVER}"
49
50 # For authentication to PW_READ_URL if required
51 SERVER_USERNAME = os.environ["USERNAME"]
52 SERVER_PASSWORD = os.environ["PASSWORD"]
53
54 CREDS = f"-U{SERVER_USERNAME}%{SERVER_PASSWORD}"
55
56
57 class GetKerberosTicketTest(BlackboxTestCase):
58     """Blackbox tests for GMSA getpassword and connecting as that user."""
59
60     @classmethod
61     def setUpClass(cls):
62         cls.lp = cls.get_loadparm()
63         cls.env_creds = cls.get_env_credentials(lp=cls.lp,
64                                                 env_username="USERNAME",
65                                                 env_password="PASSWORD",
66                                                 env_domain="DOMAIN",
67                                                 env_realm="REALM")
68         if PW_READ_URL is None:
69             url = cls.lp.private_path("sam.ldb")
70         else:
71             url = PW_CHECK_URL
72         cls.samdb = connect_samdb(url, lp=cls.lp, credentials=cls.env_creds)
73         super().setUpClass()
74
75     @classmethod
76     def setUpTestData(cls):
77         cls.gmsa_username = "GMSA_K5Test_User$"
78         cls.username = "get-kerberos-ticket-test"
79         cls.user_base_dn = f"CN=Users,{cls.samdb.domain_dn()}"
80         cls.user_dn = f"CN={cls.username},{cls.user_base_dn}"
81         cls.gmsa_base_dn = f"CN=Managed Service Accounts,{cls.samdb.domain_dn()}"
82         cls.gmsa_user_dn = f"CN={cls.gmsa_username},{cls.gmsa_base_dn}"
83
84         msg = cls.samdb.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
85         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
86
87         domain_sid = security.dom_sid(cls.samdb.get_domain_sid())
88         allow_sddl = f"O:SYD:(A;;RP;;;{connecting_user_sid})"
89         allow_sd = ndr_pack(security.descriptor.from_sddl(allow_sddl, domain_sid))
90
91         details = {
92             "dn": str(cls.gmsa_user_dn),
93             "objectClass": "msDS-GroupManagedServiceAccount",
94             "msDS-ManagedPasswordInterval": "1",
95             "msDS-GroupMSAMembership": allow_sd,
96             "sAMAccountName": cls.gmsa_username,
97             "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT),
98         }
99
100         cls.samdb.add(details)
101         cls.addClassCleanup(delete_force, cls.samdb, cls.gmsa_user_dn)
102
103         user_password = "P@ssw0rd"
104         utf16pw = ('"' + user_password + '"').encode('utf-16-le')
105         user_details = {
106             "dn": str(cls.user_dn),
107             "objectClass": "user",
108             "sAMAccountName": cls.username,
109             "userAccountControl": str(UF_NORMAL_ACCOUNT),
110             "unicodePwd": utf16pw
111         }
112
113         cls.samdb.add(user_details)
114         cls.addClassCleanup(delete_force, cls.samdb, cls.user_dn)
115
116         cls.gmsa_user = User.get(cls.samdb, account_name=cls.gmsa_username)
117         cls.user = User.get(cls.samdb, account_name=cls.username)
118
119     def get_ticket(self, username, options=None):
120         if options is None:
121             options = ""
122         ccache_path = f"{self.tempdir}/ccache"
123         ccache_location = f"FILE:{ccache_path}"
124         cmd = f"user get-kerberos-ticket --output-krb5-ccache={ccache_location} {username} {options}"
125
126         try:
127             self.check_output(cmd)
128         except BlackboxProcessError as e:
129             self.fail(e)
130         self.addCleanup(os.unlink, ccache_path)
131         return ccache_location
132
133     def test_gmsa_ticket(self):
134         # Get a ticket with the tool
135         output_ccache = self.get_ticket(self.gmsa_username)
136         creds = self.insta_creds(template=self.env_creds)
137         creds.set_kerberos_state(MUST_USE_KERBEROS)
138         creds.set_named_ccache(output_ccache, credentials.SPECIFIED, self.lp)
139         db = connect_samdb(PW_CHECK_URL, credentials=creds, lp=self.lp)
140         msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
141         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
142
143         self.assertEqual(self.gmsa_user.object_sid, connecting_user_sid)
144
145     def test_user_ticket(self):
146         output_ccache = self.get_ticket(self.username)
147         # Get a ticket with the tool
148         creds = self.insta_creds(template=self.env_creds)
149         creds.set_kerberos_state(MUST_USE_KERBEROS)
150
151         # Currently this is based on reading the unicodePwd, but this should be expanded
152         creds.set_named_ccache(output_ccache, credentials.SPECIFIED, self.lp)
153
154         db = connect_samdb(PW_CHECK_URL, credentials=creds, lp=self.lp)
155
156         msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
157         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
158
159         self.assertEqual(self.user.object_sid, connecting_user_sid)
160
161     def test_user_ticket_gpg(self):
162         output_ccache = self.get_ticket(self.username, "--decrypt-samba-gpg")
163         # Get a ticket with the tool
164         creds = self.insta_creds(template=self.env_creds)
165         creds.set_kerberos_state(MUST_USE_KERBEROS)
166         creds.set_named_ccache(output_ccache, credentials.SPECIFIED, self.lp)
167         db = connect_samdb(PW_CHECK_URL, credentials=creds, lp=self.lp)
168
169         msg = db.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
170         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
171
172         self.assertEqual(self.user.object_sid, connecting_user_sid)
173
174     @classmethod
175     def _make_cmdline(cls, line):
176         """Override to pass line as samba-tool subcommand instead.
177
178         Automatically fills in HOST and CREDS as well.
179         """
180         if isinstance(line, list):
181             cmd = ["samba-tool"] + line
182             if PW_READ_URL is not None:
183                 cmd += ["-H", PW_READ_URL, CREDS]
184         else:
185             cmd = f"samba-tool {line}"
186             if PW_READ_URL is not None:
187                 cmd += "-H {PW_READ_URL} {CREDS}"
188
189         return super()._make_cmdline(cmd)
190
191
192 if __name__ == "__main__":
193     import unittest
194     unittest.main()