1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from datetime import datetime, timezone
24 sys.path.insert(0, "bin/python")
25 os.environ["PYTHONUNBUFFERED"] = "1"
26 from collections import namedtuple
28 from ldb import SCOPE_BASE
29 from samba import generate_random_password
30 from samba.auth import system_session
31 from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
32 from samba.dcerpc import krb5pac, krb5ccache
33 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_NORMAL_ACCOUNT
34 from samba.ndr import ndr_pack, ndr_unpack
35 from samba.samdb import SamDB
37 from samba.tests import delete_force
38 from samba.tests.krb5.raw_testcase import RawKerberosTest
39 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
40 from samba.tests.krb5.rfc4120_constants import (
43 AES256_CTS_HMAC_SHA1_96,
45 KDC_ERR_PREAUTH_REQUIRED,
51 KU_TGS_REP_ENC_PART_SUB_KEY,
59 global_asn1_print = False
60 global_hexdump = False
63 class KDCBaseTest(RawKerberosTest):
64 """ Base class for KDC tests.
69 cls.lp = cls.get_loadparm(cls)
70 cls.username = os.environ["USERNAME"]
71 cls.password = os.environ["PASSWORD"]
72 cls.host = os.environ["SERVER"]
75 c.set_username(cls.username)
76 c.set_password(cls.password)
78 realm = os.environ["REALM"]
83 domain = os.environ["DOMAIN"]
92 cls.session = system_session()
93 cls.ldb = SamDB(url="ldap://%s" % cls.host,
94 session_info=cls.session,
95 credentials=cls.credentials,
97 # fetch the dnsHostName from the RootDse
99 base="", expression="", scope=SCOPE_BASE, attrs=["dnsHostName"])
100 cls.dns_host_name = str(res[0]['dnsHostName'])
104 self.do_asn1_print = global_asn1_print
105 self.do_hexdump = global_hexdump
109 # Clean up any accounts created by create_account
110 for dn in self.accounts:
111 delete_force(self.ldb, dn)
113 def create_account(self, name, machine_account=False, spn=None, upn=None):
114 '''Create an account for testing.
115 The dn of the created account is added to self.accounts,
116 which is used by tearDown to clean up the created accounts.
118 dn = "cn=%s,%s" % (name, self.ldb.domain_dn())
120 # remove the account if it exists, this will happen if a previous test
122 delete_force(self.ldb, dn)
124 object_class = "computer"
125 account_name = "%s$" % name
126 account_control = str(UF_WORKSTATION_TRUST_ACCOUNT)
128 object_class = "user"
130 account_control = str(UF_NORMAL_ACCOUNT)
132 password = generate_random_password(32, 32)
133 utf16pw = ('"%s"' % password).encode('utf-16-le')
137 "objectclass": object_class,
138 "sAMAccountName": account_name,
139 "userAccountControl": account_control,
140 "unicodePwd": utf16pw}
142 details["servicePrincipalName"] = spn
144 details["userPrincipalName"] = upn
145 self.ldb.add(details)
147 creds = Credentials()
149 creds.set_realm(self.ldb.domain_dns_name().upper())
150 creds.set_domain(self.ldb.domain_netbios_name().upper())
151 creds.set_password(password)
152 creds.set_username(account_name)
154 creds.set_workstation(name)
156 # Save the account name so it can be deleted in the tearDown
157 self.accounts.append(dn)
161 def as_req(self, cname, sname, realm, etypes, padata=None):
162 '''Send a Kerberos AS_REQ, returns the undecoded response
165 till = self.get_KerberosTime(offset=36000)
168 req = self.AS_REQ_create(padata=padata,
169 kdc_options=str(kdc_options),
179 EncAuthorizationData=None,
180 EncAuthorizationData_key=None,
181 additional_tickets=None)
182 rep = self.send_recv_transaction(req)
185 def get_as_rep_key(self, creds, rep):
186 '''Extract the session key from an AS-REP
188 rep_padata = self.der_decode(
190 asn1Spec=krb5_asn1.METHOD_DATA())
192 for pa in rep_padata:
193 if pa['padata-type'] == PADATA_ETYPE_INFO2:
194 padata_value = pa['padata-value']
197 etype_info2 = self.der_decode(
198 padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
200 key = self.PasswordKey_from_etype_info2(creds, etype_info2[0])
203 def get_pa_data(self, creds, rep, skew=0):
204 '''generate the pa_data data element for an AS-REQ
206 key = self.get_as_rep_key(creds, rep)
208 (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
209 padata = self.PA_ENC_TS_ENC_create(patime, pausec)
210 padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
212 padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
213 padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
215 padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
219 def get_as_rep_enc_data(self, key, rep):
220 ''' Decrypt and Decode the encrypted data in an AS-REP
222 enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
223 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
226 enc_part = self.der_decode(
227 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
229 enc_part = self.der_decode(
230 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
234 def check_pre_authenication(self, rep):
235 """ Check that the kdc response was pre-authentication required
237 self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
239 def check_as_reply(self, rep):
240 """ Check that the kdc response is an AS-REP and that the
246 match the expected values
249 # Should have a reply, and it should an AS-REP message.
250 self.assertIsNotNone(rep)
251 self.assertEqual(rep['msg-type'], KRB_AS_REP, "rep = {%s}" % rep)
253 # Protocol version number should be 5
254 pvno = int(rep['pvno'])
255 self.assertEqual(5, pvno, "rep = {%s}" % rep)
257 # The ticket version number should be 5
258 tkt_vno = int(rep['ticket']['tkt-vno'])
259 self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
261 # Check that the kvno is not an RODC kvno
262 # MIT kerberos does not provide the kvno, so we treat it as optional.
263 # This is tested in compatability_test.py
264 if 'kvno' in rep['enc-part']:
265 kvno = int(rep['enc-part']['kvno'])
266 # If the high order bits are set this is an RODC kvno.
267 self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
269 def check_tgs_reply(self, rep):
270 """ Check that the kdc response is an TGS-REP and that the
276 match the expected values
279 # Should have a reply, and it should an TGS-REP message.
280 self.assertIsNotNone(rep)
281 self.assertEqual(rep['msg-type'], KRB_TGS_REP, "rep = {%s}" % rep)
283 # Protocol version number should be 5
284 pvno = int(rep['pvno'])
285 self.assertEqual(5, pvno, "rep = {%s}" % rep)
287 # The ticket version number should be 5
288 tkt_vno = int(rep['ticket']['tkt-vno'])
289 self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
291 # Check that the kvno is not an RODC kvno
292 # MIT kerberos does not provide the kvno, so we treat it as optional.
293 # This is tested in compatability_test.py
294 if 'kvno' in rep['enc-part']:
295 kvno = int(rep['enc-part']['kvno'])
296 # If the high order bits are set this is an RODC kvno.
297 self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
299 def check_error_rep(self, rep, expected):
300 """ Check that the reply is an error message, with the expected
301 error-code specified.
303 self.assertIsNotNone(rep)
304 self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
305 self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
307 def tgs_req(self, cname, sname, realm, ticket, key, etypes):
308 '''Send a TGS-REQ, returns the response and the decrypted and
313 till = self.get_KerberosTime(offset=36000)
316 subkey = self.RandomKey(key.etype)
318 (ctime, cusec) = self.get_KerberosTimeWithUsec()
320 req = self.TGS_REQ_create(padata=padata,
324 kdc_options=str(kdc_options),
334 EncAuthorizationData=None,
335 EncAuthorizationData_key=None,
336 additional_tickets=None,
337 ticket_session_key=key,
338 authenticator_subkey=subkey)
339 rep = self.send_recv_transaction(req)
340 self.assertIsNotNone(rep)
342 msg_type = rep['msg-type']
344 if msg_type == KRB_TGS_REP:
345 enc_part = subkey.decrypt(
346 KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
347 enc_part = self.der_decode(
348 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
349 return (rep, enc_part)
351 # Named tuple to contain values of interest when the PAC is decoded.
352 PacData = namedtuple(
354 "account_name account_sid logon_name upn domain_name")
356 PAC_CREDENTIAL_INFO = 2
360 PAC_CONSTRAINED_DELEGATION = 11
361 PAC_UPN_DNS_INFO = 12
363 def get_pac_data(self, authorization_data):
364 '''Decode the PAC element contained in the authorization-data element
372 # The PAC data will be wrapped in an AD_IF_RELEVANT element
373 ad_if_relevant_elements = (
374 x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
375 for dt in ad_if_relevant_elements:
376 buf = self.der_decode(
377 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
378 # The PAC data is further wrapped in a AD_WIN2K_PAC element
379 for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
380 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
381 for pac in pb.buffers:
382 if pac.type == self.PAC_LOGON_INFO:
384 pac.info.info.info3.base.account_name)
386 str(pac.info.info.info3.base.domain_sid)
387 + "-" + str(pac.info.info.info3.base.rid))
388 elif pac.type == self.PAC_LOGON_NAME:
389 logon_name = pac.info.account_name
390 elif pac.type == self.PAC_UPN_DNS_INFO:
391 upn = pac.info.upn_name
392 domain_name = pac.info.dns_domain_name
401 def decode_service_ticket(self, creds, ticket):
402 '''Decrypt and decode a service ticket
405 name = creds.get_username()
406 if name.endswith('$'):
408 realm = creds.get_realm()
409 salt = "%s.%s@%s" % (name, realm.lower(), realm.upper())
411 key = self.PasswordKey_create(
412 ticket['enc-part']['etype'],
413 creds.get_password(),
415 ticket['enc-part']['kvno'])
417 enc_part = key.decrypt(KU_TICKET, ticket['enc-part']['cipher'])
418 enc_ticket_part = self.der_decode(
419 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
420 return enc_ticket_part
422 def get_objectSid(self, dn):
423 ''' Get the objectSID for a DN
424 Note: performs an Ldb query.
426 res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
427 self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
428 sid = self.ldb.schema_format_value("objectSID", res[0]["objectSID"][0])
429 return sid.decode('utf8')
431 def add_attribute(self, dn_str, name, value):
432 if isinstance(value, list):
436 flag = ldb.FLAG_MOD_ADD
438 dn = ldb.Dn(self.ldb, dn_str)
439 msg = ldb.Message(dn)
440 msg[name] = ldb.MessageElement(values, flag, name)
443 def modify_attribute(self, dn_str, name, value):
444 if isinstance(value, list):
448 flag = ldb.FLAG_MOD_REPLACE
450 dn = ldb.Dn(self.ldb, dn_str)
451 msg = ldb.Message(dn)
452 msg[name] = ldb.MessageElement(values, flag, name)
455 def create_ccache(self, cname, ticket, enc_part):
456 """ Lay out a version 4 on-disk credentials cache, to be read using the
460 field = krb5ccache.DELTATIME_TAG()
461 field.kdc_sec_offset = 0
462 field.kdc_usec_offset = 0
464 v4tag = krb5ccache.V4TAG()
468 v4tags = krb5ccache.V4TAGS()
470 v4tags.further_tags = b''
472 optional_header = krb5ccache.V4HEADER()
473 optional_header.v4tags = v4tags
475 cname_string = cname['name-string']
477 cprincipal = krb5ccache.PRINCIPAL()
478 cprincipal.name_type = cname['name-type']
479 cprincipal.component_count = len(cname_string)
480 cprincipal.realm = ticket['realm']
481 cprincipal.components = cname_string
483 sname = ticket['sname']
484 sname_string = sname['name-string']
486 sprincipal = krb5ccache.PRINCIPAL()
487 sprincipal.name_type = sname['name-type']
488 sprincipal.component_count = len(sname_string)
489 sprincipal.realm = ticket['realm']
490 sprincipal.components = sname_string
492 key = self.EncryptionKey_import(enc_part['key'])
494 key_data = key.export_obj()
495 keyblock = krb5ccache.KEYBLOCK()
496 keyblock.enctype = key_data['keytype']
497 keyblock.data = key_data['keyvalue']
499 addresses = krb5ccache.ADDRESSES()
503 authdata = krb5ccache.AUTHDATA()
507 # Re-encode the ticket, since it was decoded by another layer.
508 ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
510 authtime = enc_part['authtime']
512 starttime = enc_part['starttime']
515 endtime = enc_part['endtime']
517 cred = krb5ccache.CREDENTIAL()
518 cred.client = cprincipal
519 cred.server = sprincipal
520 cred.keyblock = keyblock
521 cred.authtime = int(datetime.strptime(authtime.decode(),
523 .replace(tzinfo=timezone.utc).timestamp())
524 cred.starttime = int(datetime.strptime(starttime.decode(),
526 .replace(tzinfo=timezone.utc).timestamp())
527 cred.endtime = int(datetime.strptime(endtime.decode(),
529 .replace(tzinfo=timezone.utc).timestamp())
531 # Account for clock skew of up to five minutes.
532 self.assertLess(cred.authtime - 5*60,
533 datetime.now(timezone.utc).timestamp(),
534 "Ticket not yet valid - clocks may be out of sync.")
535 self.assertLess(cred.starttime - 5*60,
536 datetime.now(timezone.utc).timestamp(),
537 "Ticket not yet valid - clocks may be out of sync.")
538 self.assertGreater(cred.endtime - 60*60,
539 datetime.now(timezone.utc).timestamp(),
540 "Ticket already expired/about to expire - clocks may be out of sync.")
542 cred.renew_till = cred.endtime
544 cred.ticket_flags = int(enc_part['flags'], 2)
545 cred.addresses = addresses
546 cred.authdata = authdata
547 cred.ticket = ticket_data
548 cred.second_ticket = b''
550 ccache = krb5ccache.CCACHE()
553 ccache.optional_header = optional_header
554 ccache.principal = cprincipal
557 # Serialise the credentials cache structure.
558 result = ndr_pack(ccache)
560 # Create a temporary file and write the credentials.
561 cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
562 cachefile.write(result)
567 def create_ccache_with_user(self, user_credentials, mach_name,
569 # Obtain a service ticket authorising the user and place it into a
570 # newly created credentials cache file.
572 user_name = user_credentials.get_username()
573 realm = user_credentials.get_realm()
575 # Do the initial AS-REQ, should get a pre-authentication required
577 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
578 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
580 sname = self.PrincipalName_create(name_type=NT_SRV_HST,
581 names=["krbtgt", realm])
583 rep = self.as_req(cname, sname, realm, etype)
584 self.check_pre_authenication(rep)
587 padata = self.get_pa_data(user_credentials, rep)
588 key = self.get_as_rep_key(user_credentials, rep)
589 rep = self.as_req(cname, sname, realm, etype, padata=padata)
590 self.check_as_reply(rep)
592 # Request a ticket to the host service on the machine account
593 ticket = rep['ticket']
594 enc_part = self.get_as_rep_enc_data(key, rep)
595 key = self.EncryptionKey_import(enc_part['key'])
596 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
598 sname = self.PrincipalName_create(name_type=NT_SRV_HST,
599 names=[service, mach_name])
601 (rep, enc_part) = self.tgs_req(
602 cname, sname, realm, ticket, key, etype)
603 self.check_tgs_reply(rep)
604 key = self.EncryptionKey_import(enc_part['key'])
606 # Check the contents of the pac, and the ticket
607 ticket = rep['ticket']
609 # Write the ticket into a credentials cache file that can be ingested
610 # by the main credentials code.
611 cachefile = self.create_ccache(cname, ticket, enc_part)
613 # Create a credentials object to reference the credentials cache.
614 creds = Credentials()
615 creds.set_kerberos_state(MUST_USE_KERBEROS)
616 creds.set_username(user_name, SPECIFIED)
617 creds.set_realm(realm)
618 creds.set_named_ccache(cachefile.name, SPECIFIED, self.lp)
620 # Return the credentials along with the cache file.
621 return (creds, cachefile)