s4:dsdb: Fix stack use after scope in gkdi_create_root_key()
[samba.git] / python / samba / tests / dckeytab.py
1 # Tests for source4/libnet/py_net_dckeytab.c
2 #
3 # Copyright (C) David Mulder <dmulder@suse.com> 2018
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import os
20 import subprocess
21 from samba.net import Net
22 from samba import enable_net_export_keytab
23
24 from samba import credentials, dsdb, ntstatus, NTSTATUSError
25 from samba.dcerpc import krb5ccache, security
26 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT
27 from samba.ndr import ndr_unpack, ndr_pack
28 from samba.param import LoadParm
29 from samba.samdb import SamDB
30 from samba.tests import TestCaseInTempDir, delete_force
31
32 from ldb import SCOPE_BASE
33
34 enable_net_export_keytab()
35
36
37 class DCKeytabTests(TestCaseInTempDir):
38     def setUp(self):
39         super().setUp()
40         self.lp = LoadParm()
41         self.lp.load_default()
42         self.creds = self.insta_creds(template=self.get_credentials())
43         self.samdb = SamDB(url=f"ldap://{os.environ.get('SERVER')}",
44                            credentials=self.creds,
45                            lp=self.lp)
46
47         self.ktfile = os.path.join(self.tempdir, 'test.keytab')
48         self.principal = self.creds.get_principal()
49
50     def tearDown(self):
51         super().tearDown()
52
53     def keytab_as_set(self, keytab_bytes):
54         def entry_to_tuple(entry):
55             principal = '/'.join(entry.principal.components) + f"@{entry.principal.realm}"
56             enctype = entry.enctype
57             kvno = entry.key_version
58             key = bytes(entry.key.data)
59             return (principal, enctype, kvno, key)
60
61         keytab = ndr_unpack(krb5ccache.KEYTAB, keytab_bytes)
62         entry = keytab.entry
63
64         keytab_as_set = set()
65
66         entry_as_tuple = entry_to_tuple(entry)
67         keytab_as_set.add(entry_as_tuple)
68
69         keytab_bytes = keytab.further_entry
70         while keytab_bytes:
71             multiple_entry = ndr_unpack(krb5ccache.MULTIPLE_KEYTAB_ENTRIES, keytab_bytes)
72             entry = multiple_entry.entry
73             entry_as_tuple = entry_to_tuple(entry)
74             self.assertNotIn(entry_as_tuple, keytab_as_set)
75             keytab_as_set.add(entry_as_tuple)
76
77             keytab_bytes = multiple_entry.further_entry
78
79         return keytab_as_set
80
81     def test_export_keytab(self):
82         net = Net(None, self.lp)
83         self.addCleanup(self.rm_files, self.ktfile)
84         net.export_keytab(keytab=self.ktfile, principal=self.principal)
85         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
86
87         # Parse the first entry in the keytab
88         with open(self.ktfile, 'rb') as bytes_kt:
89             keytab_bytes = bytes_kt.read()
90
91         # confirm only this principal was exported
92         for entry in self.keytab_as_set(keytab_bytes):
93             (principal, enctype, kvno, key) = entry
94             self.assertEqual(principal, self.principal)
95
96     def test_export_keytab_all(self):
97         net = Net(None, self.lp)
98         self.addCleanup(self.rm_files, self.ktfile)
99         net.export_keytab(keytab=self.ktfile)
100         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
101
102         with open(self.ktfile, 'rb') as bytes_kt:
103             keytab_bytes = bytes_kt.read()
104
105         # Parse the keytab
106         keytab_as_set = self.keytab_as_set(keytab_bytes)
107
108         # confirm many principals were exported
109         self.assertGreater(len(keytab_as_set), 10)
110
111     def test_export_keytab_all_keep_stale(self):
112         net = Net(None, self.lp)
113         self.addCleanup(self.rm_files, self.ktfile)
114         net.export_keytab(keytab=self.ktfile)
115
116         new_principal=f"keytab_testuser@{self.creds.get_realm()}"
117         self.samdb.newuser("keytab_testuser", "4rfvBGT%")
118         self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
119
120         net.export_keytab(keytab=self.ktfile, keep_stale_entries=True)
121
122         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
123
124         with open(self.ktfile, 'rb') as bytes_kt:
125             keytab_bytes = bytes_kt.read()
126
127         # confirm many principals were exported
128         # self.keytab_as_set() will also check we only got it
129         # each entry once
130         keytab_as_set = self.keytab_as_set(keytab_bytes)
131
132         self.assertGreater(len(keytab_as_set), 10)
133
134         # Look for the new principal, showing this was updated
135         found = False
136         for entry in keytab_as_set:
137             (principal, enctype, kvno, key) = entry
138             if principal == new_principal:
139                 found = True
140
141         self.assertTrue(found)
142
143     def test_export_keytab_nochange_update(self):
144         new_principal=f"keytab_testuser@{self.creds.get_realm()}"
145         self.samdb.newuser("keytab_testuser", "4rfvBGT%")
146         self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
147
148         net = Net(None, self.lp)
149         self.addCleanup(self.rm_files, self.ktfile)
150         net.export_keytab(keytab=self.ktfile, principal=new_principal)
151         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
152
153         cmd = ['klist', '-K', '-C', '-t', '-k', self.ktfile]
154         keytab_orig_content = subprocess.Popen(
155             cmd,
156             shell=False,
157             stdout=subprocess.PIPE,
158             stderr=subprocess.STDOUT,
159         ).communicate()[0]
160
161         with open(self.ktfile, 'rb') as bytes_kt:
162             keytab_orig_bytes = bytes_kt.read()
163
164         net.export_keytab(keytab=self.ktfile, principal=new_principal)
165         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
166
167         keytab_content = subprocess.Popen(
168             cmd,
169             shell=False,
170             stdout=subprocess.PIPE,
171             stderr=subprocess.STDOUT,
172         ).communicate()[0]
173
174         self.assertEqual(keytab_orig_content, keytab_content)
175
176         # Parse the first entry in the keytab
177         with open(self.ktfile, 'rb') as bytes_kt:
178             keytab_bytes = bytes_kt.read()
179
180         self.assertEqual(keytab_orig_bytes, keytab_bytes)
181
182         # confirm only this principal was exported.
183         # self.keytab_as_set() will also check we only got it
184         # once
185         for entry in self.keytab_as_set(keytab_bytes):
186             (principal, enctype, kvno, key) = entry
187             self.assertEqual(principal, new_principal)
188
189     def test_export_keytab_change_update(self):
190         new_principal=f"keytab_testuser@{self.creds.get_realm()}"
191         self.samdb.newuser("keytab_testuser", "4rfvBGT%")
192         self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
193
194         net = Net(None, self.lp)
195         self.addCleanup(self.rm_files, self.ktfile)
196         net.export_keytab(keytab=self.ktfile, principal=new_principal)
197         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
198
199         # Parse the first entry in the keytab
200         with open(self.ktfile, 'rb') as bytes_kt:
201             keytab_orig_bytes = bytes_kt.read()
202
203         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
204
205         net.export_keytab(keytab=self.ktfile, principal=new_principal)
206
207         with open(self.ktfile, 'rb') as bytes_kt:
208             keytab_change_bytes = bytes_kt.read()
209
210         self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
211
212         # We can't parse it as the parser is simple and doesn't
213         # understand holes in the file.
214
215     def test_export_keytab_change2_update(self):
216         new_principal=f"keytab_testuser@{self.creds.get_realm()}"
217         self.samdb.newuser("keytab_testuser", "4rfvBGT%")
218         self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
219
220         net = Net(None, self.lp)
221         self.addCleanup(self.rm_files, self.ktfile)
222         net.export_keytab(keytab=self.ktfile, principal=new_principal)
223         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
224
225         # Parse the first entry in the keytab
226         with open(self.ktfile, 'rb') as bytes_kt:
227             keytab_orig_bytes = bytes_kt.read()
228
229         # intended to trigger the pruning code for old keys
230         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
231         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
232
233         net.export_keytab(keytab=self.ktfile, principal=new_principal)
234
235         with open(self.ktfile, 'rb') as bytes_kt:
236             keytab_change_bytes = bytes_kt.read()
237
238         self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
239
240         # We can't parse it as the parser is simple and doesn't
241         # understand holes in the file.
242
243     def test_export_keytab_change3_update_keep(self):
244         new_principal=f"keytab_testuser@{self.creds.get_realm()}"
245         self.samdb.newuser("keytab_testuser", "4rfvBGT%")
246         self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
247         net = Net(None, self.lp)
248         self.addCleanup(self.rm_files, self.ktfile)
249         net.export_keytab(keytab=self.ktfile, principal=new_principal)
250         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
251
252         # Parse the first entry in the keytab
253         with open(self.ktfile, 'rb') as bytes_kt:
254             keytab_orig_bytes = bytes_kt.read()
255
256         # By changing the password three times, we allow Samba to fill
257         # out current, old, older from supplementalCredentials and
258         # still have one password that must still be from the original
259         # keytab
260         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
261         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
262         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
263
264         net.export_keytab(keytab=self.ktfile, principal=new_principal, keep_stale_entries=True)
265
266         with open(self.ktfile, 'rb') as bytes_kt:
267             keytab_change_bytes = bytes_kt.read()
268
269         self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
270
271         # self.keytab_as_set() will also check we got each entry
272         # exactly once
273         keytab_as_set = self.keytab_as_set(keytab_change_bytes)
274
275         # Look for the new principal, showing this was updated but the old kept
276         found = 0
277         for entry in keytab_as_set:
278             (principal, enctype, kvno, key) = entry
279             if principal == new_principal and enctype == credentials.ENCTYPE_AES128_CTS_HMAC_SHA1_96:
280                 found += 1
281
282         # Samba currently does not export the previous keys into the keytab, but could.
283         self.assertEqual(found, 4)
284
285         # confirm at least 12 keys (4 changes, 1 in orig export and 3
286         # history in 2nd export, 3 enctypes) were exported
287         self.assertGreaterEqual(len(keytab_as_set), 12)
288
289     def test_export_keytab_change2_export2_update_keep(self):
290         new_principal=f"keytab_testuser@{self.creds.get_realm()}"
291         self.samdb.newuser("keytab_testuser", "4rfvBGT%")
292         self.addCleanup(self.samdb.deleteuser, "keytab_testuser")
293         net = Net(None, self.lp)
294         self.addCleanup(self.rm_files, self.ktfile)
295         net.export_keytab(keytab=self.ktfile, principal=new_principal)
296         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
297
298         # Parse the first entry in the keytab
299         with open(self.ktfile, 'rb') as bytes_kt:
300             keytab_orig_bytes = bytes_kt.read()
301
302         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "5rfvBGT%")
303
304         net.export_keytab(keytab=self.ktfile, principal=new_principal, keep_stale_entries=True)
305
306         self.samdb.setpassword(f"(userPrincipalName={new_principal})", "6rfvBGT%")
307
308         net.export_keytab(keytab=self.ktfile, principal=new_principal, keep_stale_entries=True)
309
310         with open(self.ktfile, 'rb') as bytes_kt:
311             keytab_change_bytes = bytes_kt.read()
312
313         self.assertNotEqual(keytab_orig_bytes, keytab_change_bytes)
314
315         # self.keytab_as_set() will also check we got each entry
316         # exactly once
317         keytab_as_set = self.keytab_as_set(keytab_change_bytes)
318
319         # Look for the new principal, showing this was updated but the old kept
320         found = 0
321         for entry in keytab_as_set:
322             (principal, enctype, kvno, key) = entry
323             if principal == new_principal and enctype == credentials.ENCTYPE_AES128_CTS_HMAC_SHA1_96:
324                 found += 1
325
326         # This covers the simple case, one export per password change
327         self.assertEqual(found, 3)
328
329         # confirm at least 9 keys (3 exports, 3 enctypes) were exported
330         self.assertGreaterEqual(len(keytab_as_set), 9)
331
332     def test_export_keytab_not_a_dir(self):
333         net = Net(None, self.lp)
334         with open(self.ktfile, mode='w') as f:
335             f.write("NOT A KEYTAB")
336         self.addCleanup(self.rm_files, self.ktfile)
337
338         try:
339             net.export_keytab(keytab=self.ktfile + "/f")
340             self.fail("Expected failure to write to an existing file")
341         except NTSTATUSError as err:
342             num, _ = err.args
343             self.assertEqual(num, ntstatus.NT_STATUS_NOT_A_DIRECTORY)
344
345     def test_export_keytab_existing(self):
346         net = Net(None, self.lp)
347         with open(self.ktfile, mode='w') as f:
348             f.write("NOT A KEYTAB")
349         self.addCleanup(self.rm_files, self.ktfile)
350
351         try:
352             net.export_keytab(keytab=self.ktfile)
353             self.fail(f"Expected failure to write to an existing file {self.ktfile}")
354         except NTSTATUSError as err:
355             num, _ = err.args
356             self.assertEqual(num, ntstatus.NT_STATUS_OBJECT_NAME_EXISTS)
357
358     def test_export_keytab_gmsa(self):
359
360         # Create gMSA account
361         gmsa_username = "GMSA_K5KeytabTest$"
362         gmsa_principal = f"{gmsa_username}@{self.samdb.domain_dns_name().upper()}"
363         gmsa_base_dn = self.samdb.get_wellknown_dn(
364             self.samdb.get_default_basedn(),
365             dsdb.DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER,
366         )
367         gmsa_user_dn = f"CN={gmsa_username},{gmsa_base_dn}"
368
369         msg = self.samdb.search(base="", scope=SCOPE_BASE, attrs=["tokenGroups"])[0]
370         connecting_user_sid = str(ndr_unpack(security.dom_sid, msg["tokenGroups"][0]))
371
372         domain_sid = security.dom_sid(self.samdb.get_domain_sid())
373         allow_sddl = f"O:SYD:(A;;RP;;;{connecting_user_sid})"
374         allow_sd = ndr_pack(security.descriptor.from_sddl(allow_sddl, domain_sid))
375
376         details = {
377             "dn": str(gmsa_user_dn),
378             "objectClass": "msDS-GroupManagedServiceAccount",
379             "msDS-ManagedPasswordInterval": "1",
380             "msDS-GroupMSAMembership": allow_sd,
381             "sAMAccountName": gmsa_username,
382             "userAccountControl": str(UF_WORKSTATION_TRUST_ACCOUNT),
383         }
384
385         delete_force(self.samdb, gmsa_user_dn)
386         self.samdb.add(details)
387         self.addCleanup(delete_force, self.samdb, gmsa_user_dn)
388
389         # Export keytab of gMSA account remotely
390         net = Net(None, self.lp)
391         try:
392             net.export_keytab(samdb=self.samdb, keytab=self.ktfile, principal=gmsa_principal)
393         except RuntimeError as e:
394             self.fail(e)
395
396         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
397
398         # Parse the first entry in the keytab
399         with open(self.ktfile, 'rb') as bytes_kt:
400             keytab_bytes = bytes_kt.read()
401
402         remote_keytab = ndr_unpack(krb5ccache.KEYTAB, keytab_bytes)
403
404         self.rm_files('test.keytab')
405
406         # Export keytab of gMSA account locally
407         try:
408             net.export_keytab(keytab=self.ktfile, principal=gmsa_principal)
409         except RuntimeError as e:
410             self.fail(e)
411
412         self.assertTrue(os.path.exists(self.ktfile), 'keytab was not created')
413
414         # Parse the first entry in the keytab
415         with open(self.ktfile, 'rb') as bytes_kt:
416             keytab_bytes = bytes_kt.read()
417
418         self.rm_files('test.keytab')
419
420         local_keytab = ndr_unpack(krb5ccache.KEYTAB, keytab_bytes)
421
422         # Confirm that the principal is as expected
423
424         principal_parts = gmsa_principal.split('@')
425
426         self.assertEqual(local_keytab.entry.principal.component_count, 1)
427         self.assertEqual(local_keytab.entry.principal.realm, principal_parts[1])
428         self.assertEqual(local_keytab.entry.principal.components[0], principal_parts[0])
429
430         self.assertEqual(remote_keytab.entry.principal.component_count, 1)
431         self.assertEqual(remote_keytab.entry.principal.realm, principal_parts[1])
432         self.assertEqual(remote_keytab.entry.principal.components[0], principal_parts[0])
433
434         # Put all keys from each into a dictionary, and confirm all remote keys are in local keytab
435
436         remote_keys = {}
437
438         while True:
439             remote_keys[remote_keytab.entry.enctype] = bytes(remote_keytab.entry.key.data)
440             keytab_bytes = remote_keytab.further_entry
441             if not keytab_bytes:
442                 break
443
444             remote_keytab = ndr_unpack(krb5ccache.MULTIPLE_KEYTAB_ENTRIES, keytab_bytes)
445
446         local_keys = {}
447
448         while True:
449             local_keys[local_keytab.entry.enctype] = bytes(local_keytab.entry.key.data)
450             keytab_bytes = local_keytab.further_entry
451             if not keytab_bytes:
452                 break
453             local_keytab = ndr_unpack(krb5ccache.MULTIPLE_KEYTAB_ENTRIES, keytab_bytes)
454
455         # Check that the gMSA keys are in the local keys
456         remote_enctypes = set(remote_keys.keys())
457
458         # Check that at least the AES keys were generated
459         self.assertLessEqual({credentials.ENCTYPE_AES256_CTS_HMAC_SHA1_96,
460                               credentials.ENCTYPE_AES128_CTS_HMAC_SHA1_96},
461                              remote_enctypes)
462
463         local_enctypes = set(local_keys.keys())
464
465         self.assertLessEqual(remote_enctypes, local_enctypes)
466
467         common_enctypes = remote_enctypes & local_enctypes
468
469         for enctype in common_enctypes:
470             self.assertEqual(remote_keys[enctype], local_keys[enctype])