e345f739e1c0d2b43504c1b7d9affc0ee554cc45
[gd/samba-autobuild/.git] / python / samba / tests / krb5 / kdc_base_test.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import os
21 from datetime import datetime, timezone
22 import tempfile
23
24 sys.path.insert(0, "bin/python")
25 os.environ["PYTHONUNBUFFERED"] = "1"
26 from collections import namedtuple
27 import ldb
28 from ldb import SCOPE_BASE
29 from samba import generate_random_password
30 from samba.auth import system_session
31 from samba.credentials import Credentials, SPECIFIED, MUST_USE_KERBEROS
32 from samba.dcerpc import krb5pac, krb5ccache
33 from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_NORMAL_ACCOUNT
34 from samba.ndr import ndr_pack, ndr_unpack
35 from samba.samdb import SamDB
36
37 from samba.tests import delete_force
38 from samba.tests.krb5.raw_testcase import RawKerberosTest
39 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
40 from samba.tests.krb5.rfc4120_constants import (
41     AD_IF_RELEVANT,
42     AD_WIN2K_PAC,
43     AES256_CTS_HMAC_SHA1_96,
44     ARCFOUR_HMAC_MD5,
45     KDC_ERR_PREAUTH_REQUIRED,
46     KRB_AS_REP,
47     KRB_TGS_REP,
48     KRB_ERROR,
49     KU_AS_REP_ENC_PART,
50     KU_PA_ENC_TIMESTAMP,
51     KU_TGS_REP_ENC_PART_SUB_KEY,
52     KU_TICKET,
53     NT_PRINCIPAL,
54     NT_SRV_HST,
55     PADATA_ENC_TIMESTAMP,
56     PADATA_ETYPE_INFO2,
57 )
58
59 global_asn1_print = False
60 global_hexdump = False
61
62
63 class KDCBaseTest(RawKerberosTest):
64     """ Base class for KDC tests.
65     """
66
67     @classmethod
68     def setUpClass(cls):
69         cls.lp = cls.get_loadparm(cls)
70         cls.username = os.environ["USERNAME"]
71         cls.password = os.environ["PASSWORD"]
72         cls.host = os.environ["SERVER"]
73
74         c = Credentials()
75         c.set_username(cls.username)
76         c.set_password(cls.password)
77         try:
78             realm = os.environ["REALM"]
79             c.set_realm(realm)
80         except KeyError:
81             pass
82         try:
83             domain = os.environ["DOMAIN"]
84             c.set_domain(domain)
85         except KeyError:
86             pass
87
88         c.guess()
89
90         cls.credentials = c
91
92         cls.session = system_session()
93         cls.ldb = SamDB(url="ldap://%s" % cls.host,
94                         session_info=cls.session,
95                         credentials=cls.credentials,
96                         lp=cls.lp)
97         # fetch the dnsHostName from the RootDse
98         res = cls.ldb.search(
99             base="", expression="", scope=SCOPE_BASE, attrs=["dnsHostName"])
100         cls.dns_host_name = str(res[0]['dnsHostName'])
101
102     def setUp(self):
103         super().setUp()
104         self.do_asn1_print = global_asn1_print
105         self.do_hexdump = global_hexdump
106         self.accounts = []
107
108     def tearDown(self):
109         # Clean up any accounts created by create_account
110         for dn in self.accounts:
111             delete_force(self.ldb, dn)
112
113     def create_account(self, name, machine_account=False, spn=None, upn=None):
114         '''Create an account for testing.
115            The dn of the created account is added to self.accounts,
116            which is used by tearDown to clean up the created accounts.
117         '''
118         dn = "cn=%s,%s" % (name, self.ldb.domain_dn())
119
120         # remove the account if it exists, this will happen if a previous test
121         # run failed
122         delete_force(self.ldb, dn)
123         if machine_account:
124             object_class = "computer"
125             account_name = "%s$" % name
126             account_control = str(UF_WORKSTATION_TRUST_ACCOUNT)
127         else:
128             object_class = "user"
129             account_name = name
130             account_control = str(UF_NORMAL_ACCOUNT)
131
132         password = generate_random_password(32, 32)
133         utf16pw = ('"%s"' % password).encode('utf-16-le')
134
135         details = {
136             "dn": dn,
137             "objectclass": object_class,
138             "sAMAccountName": account_name,
139             "userAccountControl": account_control,
140             "unicodePwd": utf16pw}
141         if spn is not None:
142             details["servicePrincipalName"] = spn
143         if upn is not None:
144             details["userPrincipalName"] = upn
145         self.ldb.add(details)
146
147         creds = Credentials()
148         creds.guess(self.lp)
149         creds.set_realm(self.ldb.domain_dns_name().upper())
150         creds.set_domain(self.ldb.domain_netbios_name().upper())
151         creds.set_password(password)
152         creds.set_username(account_name)
153         if machine_account:
154             creds.set_workstation(name)
155         #
156         # Save the account name so it can be deleted in the tearDown
157         self.accounts.append(dn)
158
159         return (creds, dn)
160
161     def as_req(self, cname, sname, realm, etypes, padata=None):
162         '''Send a Kerberos AS_REQ, returns the undecoded response
163         '''
164
165         till = self.get_KerberosTime(offset=36000)
166         kdc_options = 0
167
168         req = self.AS_REQ_create(padata=padata,
169                                  kdc_options=str(kdc_options),
170                                  cname=cname,
171                                  realm=realm,
172                                  sname=sname,
173                                  from_time=None,
174                                  till_time=till,
175                                  renew_time=None,
176                                  nonce=0x7fffffff,
177                                  etypes=etypes,
178                                  addresses=None,
179                                  EncAuthorizationData=None,
180                                  EncAuthorizationData_key=None,
181                                  additional_tickets=None)
182         rep = self.send_recv_transaction(req)
183         return rep
184
185     def get_as_rep_key(self, creds, rep):
186         '''Extract the session key from an AS-REP
187         '''
188         rep_padata = self.der_decode(
189             rep['e-data'],
190             asn1Spec=krb5_asn1.METHOD_DATA())
191
192         for pa in rep_padata:
193             if pa['padata-type'] == PADATA_ETYPE_INFO2:
194                 padata_value = pa['padata-value']
195                 break
196
197         etype_info2 = self.der_decode(
198             padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
199
200         key = self.PasswordKey_from_etype_info2(creds, etype_info2[0])
201         return key
202
203     def get_pa_data(self, creds, rep, skew=0):
204         '''generate the pa_data data element for an AS-REQ
205         '''
206         key = self.get_as_rep_key(creds, rep)
207
208         (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
209         padata = self.PA_ENC_TS_ENC_create(patime, pausec)
210         padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
211
212         padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
213         padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
214
215         padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
216
217         return [padata]
218
219     def get_as_rep_enc_data(self, key, rep):
220         ''' Decrypt and Decode the encrypted data in an AS-REP
221         '''
222         enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
223         # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
224         # application tag 26
225         try:
226             enc_part = self.der_decode(
227                 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
228         except Exception:
229             enc_part = self.der_decode(
230                 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
231
232         return enc_part
233
234     def check_pre_authenication(self, rep):
235         """ Check that the kdc response was pre-authentication required
236         """
237         self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
238
239     def check_as_reply(self, rep):
240         """ Check that the kdc response is an AS-REP and that the
241             values for:
242                 msg-type
243                 pvno
244                 tkt-pvno
245                 kvno
246             match the expected values
247         """
248
249         # Should have a reply, and it should an AS-REP message.
250         self.assertIsNotNone(rep)
251         self.assertEqual(rep['msg-type'], KRB_AS_REP, "rep = {%s}" % rep)
252
253         # Protocol version number should be 5
254         pvno = int(rep['pvno'])
255         self.assertEqual(5, pvno, "rep = {%s}" % rep)
256
257         # The ticket version number should be 5
258         tkt_vno = int(rep['ticket']['tkt-vno'])
259         self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
260
261         # Check that the kvno is not an RODC kvno
262         # MIT kerberos does not provide the kvno, so we treat it as optional.
263         # This is tested in compatability_test.py
264         if 'kvno' in rep['enc-part']:
265             kvno = int(rep['enc-part']['kvno'])
266             # If the high order bits are set this is an RODC kvno.
267             self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
268
269     def check_tgs_reply(self, rep):
270         """ Check that the kdc response is an TGS-REP and that the
271             values for:
272                 msg-type
273                 pvno
274                 tkt-pvno
275                 kvno
276             match the expected values
277         """
278
279         # Should have a reply, and it should an TGS-REP message.
280         self.assertIsNotNone(rep)
281         self.assertEqual(rep['msg-type'], KRB_TGS_REP, "rep = {%s}" % rep)
282
283         # Protocol version number should be 5
284         pvno = int(rep['pvno'])
285         self.assertEqual(5, pvno, "rep = {%s}" % rep)
286
287         # The ticket version number should be 5
288         tkt_vno = int(rep['ticket']['tkt-vno'])
289         self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
290
291         # Check that the kvno is not an RODC kvno
292         # MIT kerberos does not provide the kvno, so we treat it as optional.
293         # This is tested in compatability_test.py
294         if 'kvno' in rep['enc-part']:
295             kvno = int(rep['enc-part']['kvno'])
296             # If the high order bits are set this is an RODC kvno.
297             self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
298
299     def check_error_rep(self, rep, expected):
300         """ Check that the reply is an error message, with the expected
301             error-code specified.
302         """
303         self.assertIsNotNone(rep)
304         self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
305         self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
306
307     def tgs_req(self, cname, sname, realm, ticket, key, etypes):
308         '''Send a TGS-REQ, returns the response and the decrypted and
309            decoded enc-part
310         '''
311
312         kdc_options = "0"
313         till = self.get_KerberosTime(offset=36000)
314         padata = []
315
316         subkey = self.RandomKey(key.etype)
317
318         (ctime, cusec) = self.get_KerberosTimeWithUsec()
319
320         req = self.TGS_REQ_create(padata=padata,
321                                   cusec=cusec,
322                                   ctime=ctime,
323                                   ticket=ticket,
324                                   kdc_options=str(kdc_options),
325                                   cname=cname,
326                                   realm=realm,
327                                   sname=sname,
328                                   from_time=None,
329                                   till_time=till,
330                                   renew_time=None,
331                                   nonce=0x7ffffffe,
332                                   etypes=etypes,
333                                   addresses=None,
334                                   EncAuthorizationData=None,
335                                   EncAuthorizationData_key=None,
336                                   additional_tickets=None,
337                                   ticket_session_key=key,
338                                   authenticator_subkey=subkey)
339         rep = self.send_recv_transaction(req)
340         self.assertIsNotNone(rep)
341
342         msg_type = rep['msg-type']
343         enc_part = None
344         if msg_type == KRB_TGS_REP:
345             enc_part = subkey.decrypt(
346                 KU_TGS_REP_ENC_PART_SUB_KEY, rep['enc-part']['cipher'])
347             enc_part = self.der_decode(
348                 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
349         return (rep, enc_part)
350
351     # Named tuple to contain values of interest when the PAC is decoded.
352     PacData = namedtuple(
353         "PacData",
354         "account_name account_sid logon_name upn domain_name")
355     PAC_LOGON_INFO = 1
356     PAC_CREDENTIAL_INFO = 2
357     PAC_SRV_CHECKSUM = 6
358     PAC_KDC_CHECKSUM = 7
359     PAC_LOGON_NAME = 10
360     PAC_CONSTRAINED_DELEGATION = 11
361     PAC_UPN_DNS_INFO = 12
362
363     def get_pac_data(self, authorization_data):
364         '''Decode the PAC element contained in the authorization-data element
365         '''
366         account_name = None
367         user_sid = None
368         logon_name = None
369         upn = None
370         domain_name = None
371
372         # The PAC data will be wrapped in an AD_IF_RELEVANT element
373         ad_if_relevant_elements = (
374             x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
375         for dt in ad_if_relevant_elements:
376             buf = self.der_decode(
377                 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
378             # The PAC data is further wrapped in a AD_WIN2K_PAC element
379             for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
380                 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
381                 for pac in pb.buffers:
382                     if pac.type == self.PAC_LOGON_INFO:
383                         account_name = (
384                             pac.info.info.info3.base.account_name)
385                         user_sid = (
386                             str(pac.info.info.info3.base.domain_sid)
387                             + "-" + str(pac.info.info.info3.base.rid))
388                     elif pac.type == self.PAC_LOGON_NAME:
389                         logon_name = pac.info.account_name
390                     elif pac.type == self.PAC_UPN_DNS_INFO:
391                         upn = pac.info.upn_name
392                         domain_name = pac.info.dns_domain_name
393
394         return self.PacData(
395             account_name,
396             user_sid,
397             logon_name,
398             upn,
399             domain_name)
400
401     def decode_service_ticket(self, creds, ticket):
402         '''Decrypt and decode a service ticket
403         '''
404
405         name = creds.get_username()
406         if name.endswith('$'):
407             name = name[:-1]
408         realm = creds.get_realm()
409         salt = "%s.%s@%s" % (name, realm.lower(), realm.upper())
410
411         key = self.PasswordKey_create(
412             ticket['enc-part']['etype'],
413             creds.get_password(),
414             salt,
415             ticket['enc-part']['kvno'])
416
417         enc_part = key.decrypt(KU_TICKET, ticket['enc-part']['cipher'])
418         enc_ticket_part = self.der_decode(
419             enc_part, asn1Spec=krb5_asn1.EncTicketPart())
420         return enc_ticket_part
421
422     def get_objectSid(self, dn):
423         ''' Get the objectSID for a DN
424             Note: performs an Ldb query.
425         '''
426         res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
427         self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
428         sid = self.ldb.schema_format_value("objectSID", res[0]["objectSID"][0])
429         return sid.decode('utf8')
430
431     def add_attribute(self, dn_str, name, value):
432         if isinstance(value, list):
433             values = value
434         else:
435             values = [value]
436         flag = ldb.FLAG_MOD_ADD
437
438         dn = ldb.Dn(self.ldb, dn_str)
439         msg = ldb.Message(dn)
440         msg[name] = ldb.MessageElement(values, flag, name)
441         self.ldb.modify(msg)
442
443     def modify_attribute(self, dn_str, name, value):
444         if isinstance(value, list):
445             values = value
446         else:
447             values = [value]
448         flag = ldb.FLAG_MOD_REPLACE
449
450         dn = ldb.Dn(self.ldb, dn_str)
451         msg = ldb.Message(dn)
452         msg[name] = ldb.MessageElement(values, flag, name)
453         self.ldb.modify(msg)
454
455     def create_ccache(self, cname, ticket, enc_part):
456         """ Lay out a version 4 on-disk credentials cache, to be read using the
457             FILE: protocol.
458         """
459
460         field = krb5ccache.DELTATIME_TAG()
461         field.kdc_sec_offset = 0
462         field.kdc_usec_offset = 0
463
464         v4tag = krb5ccache.V4TAG()
465         v4tag.tag = 1
466         v4tag.field = field
467
468         v4tags = krb5ccache.V4TAGS()
469         v4tags.tag = v4tag
470         v4tags.further_tags = b''
471
472         optional_header = krb5ccache.V4HEADER()
473         optional_header.v4tags = v4tags
474
475         cname_string = cname['name-string']
476
477         cprincipal = krb5ccache.PRINCIPAL()
478         cprincipal.name_type = cname['name-type']
479         cprincipal.component_count = len(cname_string)
480         cprincipal.realm = ticket['realm']
481         cprincipal.components = cname_string
482
483         sname = ticket['sname']
484         sname_string = sname['name-string']
485
486         sprincipal = krb5ccache.PRINCIPAL()
487         sprincipal.name_type = sname['name-type']
488         sprincipal.component_count = len(sname_string)
489         sprincipal.realm = ticket['realm']
490         sprincipal.components = sname_string
491
492         key = self.EncryptionKey_import(enc_part['key'])
493
494         key_data = key.export_obj()
495         keyblock = krb5ccache.KEYBLOCK()
496         keyblock.enctype = key_data['keytype']
497         keyblock.data = key_data['keyvalue']
498
499         addresses = krb5ccache.ADDRESSES()
500         addresses.count = 0
501         addresses.data = []
502
503         authdata = krb5ccache.AUTHDATA()
504         authdata.count = 0
505         authdata.data = []
506
507         # Re-encode the ticket, since it was decoded by another layer.
508         ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
509
510         authtime = enc_part['authtime']
511         try:
512             starttime = enc_part['starttime']
513         except KeyError:
514             starttime = authtime
515         endtime = enc_part['endtime']
516
517         cred = krb5ccache.CREDENTIAL()
518         cred.client = cprincipal
519         cred.server = sprincipal
520         cred.keyblock = keyblock
521         cred.authtime = int(datetime.strptime(authtime.decode(),
522                                               "%Y%m%d%H%M%SZ")
523                             .replace(tzinfo=timezone.utc).timestamp())
524         cred.starttime = int(datetime.strptime(starttime.decode(),
525                                                "%Y%m%d%H%M%SZ")
526                             .replace(tzinfo=timezone.utc).timestamp())
527         cred.endtime = int(datetime.strptime(endtime.decode(),
528                                              "%Y%m%d%H%M%SZ")
529                             .replace(tzinfo=timezone.utc).timestamp())
530
531         # Account for clock skew of up to five minutes.
532         self.assertLess(cred.authtime - 5*60,
533                         datetime.now(timezone.utc).timestamp(),
534                         "Ticket not yet valid - clocks may be out of sync.")
535         self.assertLess(cred.starttime - 5*60,
536                         datetime.now(timezone.utc).timestamp(),
537                         "Ticket not yet valid - clocks may be out of sync.")
538         self.assertGreater(cred.endtime - 60*60,
539                            datetime.now(timezone.utc).timestamp(),
540                            "Ticket already expired/about to expire - clocks may be out of sync.")
541
542         cred.renew_till = cred.endtime
543         cred.is_skey = 0
544         cred.ticket_flags = int(enc_part['flags'], 2)
545         cred.addresses = addresses
546         cred.authdata = authdata
547         cred.ticket = ticket_data
548         cred.second_ticket = b''
549
550         ccache = krb5ccache.CCACHE()
551         ccache.pvno = 5
552         ccache.version = 4
553         ccache.optional_header = optional_header
554         ccache.principal = cprincipal
555         ccache.cred = cred
556
557         # Serialise the credentials cache structure.
558         result = ndr_pack(ccache)
559
560         # Create a temporary file and write the credentials.
561         cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
562         cachefile.write(result)
563         cachefile.close()
564
565         return cachefile
566
567     def create_ccache_with_user(self, user_credentials, mach_name,
568                                 service="host"):
569         # Obtain a service ticket authorising the user and place it into a
570         # newly created credentials cache file.
571
572         user_name = user_credentials.get_username()
573         realm = user_credentials.get_realm()
574
575         # Do the initial AS-REQ, should get a pre-authentication required
576         # response
577         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
578         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
579                                           names=[user_name])
580         sname = self.PrincipalName_create(name_type=NT_SRV_HST,
581                                           names=["krbtgt", realm])
582
583         rep = self.as_req(cname, sname, realm, etype)
584         self.check_pre_authenication(rep)
585
586         # Do the next AS-REQ
587         padata = self.get_pa_data(user_credentials, rep)
588         key = self.get_as_rep_key(user_credentials, rep)
589         rep = self.as_req(cname, sname, realm, etype, padata=padata)
590         self.check_as_reply(rep)
591
592         # Request a ticket to the host service on the machine account
593         ticket = rep['ticket']
594         enc_part = self.get_as_rep_enc_data(key, rep)
595         key = self.EncryptionKey_import(enc_part['key'])
596         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
597                                           names=[user_name])
598         sname = self.PrincipalName_create(name_type=NT_SRV_HST,
599                                           names=[service, mach_name])
600
601         (rep, enc_part) = self.tgs_req(
602             cname, sname, realm, ticket, key, etype)
603         self.check_tgs_reply(rep)
604         key = self.EncryptionKey_import(enc_part['key'])
605
606         # Check the contents of the pac, and the ticket
607         ticket = rep['ticket']
608
609         # Write the ticket into a credentials cache file that can be ingested
610         # by the main credentials code.
611         cachefile = self.create_ccache(cname, ticket, enc_part)
612
613         # Create a credentials object to reference the credentials cache.
614         creds = Credentials()
615         creds.set_kerberos_state(MUST_USE_KERBEROS)
616         creds.set_username(user_name, SPECIFIED)
617         creds.set_realm(realm)
618         creds.set_named_ccache(cachefile.name, SPECIFIED, self.lp)
619
620         # Return the credentials along with the cache file.
621         return (creds, cachefile)