CVE-2022-2031 tests/krb5: Allow requesting a TGT to a different sname and realm
[samba.git] / python / samba / tests / krb5 / kdc_base_test.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Stefan Metzmacher 2020
3 # Copyright (C) 2020-2021 Catalyst.Net Ltd
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import os
21 from datetime import datetime, timezone
22 import tempfile
23 import binascii
24 import collections
25 import secrets
26 from enum import Enum
27
28 from collections import namedtuple
29 import ldb
30 from ldb import SCOPE_BASE
31 from samba import generate_random_password
32 from samba.auth import system_session
33 from samba.credentials import (
34     Credentials,
35     SPECIFIED,
36     DONT_USE_KERBEROS,
37     MUST_USE_KERBEROS,
38 )
39 from samba.dcerpc import drsblobs, drsuapi, misc, krb5pac, krb5ccache, security
40 from samba.drs_utils import drs_Replicate, drsuapi_connect
41 from samba.dsdb import (
42     DSDB_SYNTAX_BINARY_DN,
43     DS_DOMAIN_FUNCTION_2000,
44     DS_DOMAIN_FUNCTION_2008,
45     DS_GUID_COMPUTERS_CONTAINER,
46     DS_GUID_DOMAIN_CONTROLLERS_CONTAINER,
47     DS_GUID_USERS_CONTAINER,
48     UF_WORKSTATION_TRUST_ACCOUNT,
49     UF_NO_AUTH_DATA_REQUIRED,
50     UF_NORMAL_ACCOUNT,
51     UF_NOT_DELEGATED,
52     UF_PARTIAL_SECRETS_ACCOUNT,
53     UF_SERVER_TRUST_ACCOUNT,
54     UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
55 )
56 from samba.join import DCJoinContext
57 from samba.ndr import ndr_pack, ndr_unpack
58 from samba import net
59 from samba.samdb import SamDB, dsdb_Dn
60
61 from samba.tests import delete_force
62 import samba.tests.krb5.kcrypto as kcrypto
63 from samba.tests.krb5.raw_testcase import (
64     KerberosCredentials,
65     KerberosTicketCreds,
66     RawKerberosTest
67 )
68 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
69 from samba.tests.krb5.rfc4120_constants import (
70     AD_IF_RELEVANT,
71     AD_WIN2K_PAC,
72     AES256_CTS_HMAC_SHA1_96,
73     ARCFOUR_HMAC_MD5,
74     KDC_ERR_PREAUTH_REQUIRED,
75     KDC_ERR_TGT_REVOKED,
76     KRB_AS_REP,
77     KRB_TGS_REP,
78     KRB_ERROR,
79     KU_AS_REP_ENC_PART,
80     KU_ENC_CHALLENGE_CLIENT,
81     KU_PA_ENC_TIMESTAMP,
82     KU_TICKET,
83     NT_PRINCIPAL,
84     NT_SRV_INST,
85     PADATA_ENCRYPTED_CHALLENGE,
86     PADATA_ENC_TIMESTAMP,
87     PADATA_ETYPE_INFO2,
88 )
89
90 sys.path.insert(0, "bin/python")
91 os.environ["PYTHONUNBUFFERED"] = "1"
92
93 global_asn1_print = False
94 global_hexdump = False
95
96
97 class KDCBaseTest(RawKerberosTest):
98     """ Base class for KDC tests.
99     """
100
101     class AccountType(Enum):
102         USER = object()
103         COMPUTER = object()
104         SERVER = object()
105         RODC = object()
106
107     @classmethod
108     def setUpClass(cls):
109         super().setUpClass()
110         cls._lp = None
111
112         cls._ldb = None
113         cls._rodc_ldb = None
114
115         cls._functional_level = None
116
117         # An identifier to ensure created accounts have unique names. Windows
118         # caches accounts based on usernames, so account names being different
119         # across test runs avoids previous test runs affecting the results.
120         cls.account_base = f'{secrets.token_hex(4)}_'
121         cls.account_id = 0
122
123         # A list containing DNs of accounts created as part of testing.
124         cls.accounts = []
125
126         cls.account_cache = {}
127         cls.tkt_cache = {}
128
129         cls._rodc_ctx = None
130
131         cls.ldb_cleanups = []
132
133     @classmethod
134     def tearDownClass(cls):
135         # Clean up any accounts created by create_account. This is
136         # done in tearDownClass() rather than tearDown(), so that
137         # accounts need only be created once for permutation tests.
138         if cls._ldb is not None:
139             for cleanup in reversed(cls.ldb_cleanups):
140                 try:
141                     cls._ldb.modify(cleanup)
142                 except ldb.LdbError:
143                     pass
144
145             for dn in reversed(cls.accounts):
146                 delete_force(cls._ldb, dn)
147
148         if cls._rodc_ctx is not None:
149             cls._rodc_ctx.cleanup_old_join(force=True)
150
151         super().tearDownClass()
152
153     def setUp(self):
154         super().setUp()
155         self.do_asn1_print = global_asn1_print
156         self.do_hexdump = global_hexdump
157
158     def get_lp(self):
159         if self._lp is None:
160             type(self)._lp = self.get_loadparm()
161
162         return self._lp
163
164     def get_samdb(self):
165         if self._ldb is None:
166             creds = self.get_admin_creds()
167             lp = self.get_lp()
168
169             session = system_session()
170             type(self)._ldb = SamDB(url="ldap://%s" % self.dc_host,
171                                     session_info=session,
172                                     credentials=creds,
173                                     lp=lp)
174
175         return self._ldb
176
177     def get_rodc_samdb(self):
178         if self._rodc_ldb is None:
179             creds = self.get_admin_creds()
180             lp = self.get_lp()
181
182             session = system_session()
183             type(self)._rodc_ldb = SamDB(url="ldap://%s" % self.host,
184                                          session_info=session,
185                                          credentials=creds,
186                                          lp=lp,
187                                          am_rodc=True)
188
189         return self._rodc_ldb
190
191     def get_server_dn(self, samdb):
192         server = samdb.get_serverName()
193
194         res = samdb.search(base=server,
195                            scope=ldb.SCOPE_BASE,
196                            attrs=['serverReference'])
197         dn = ldb.Dn(samdb, res[0]['serverReference'][0].decode('utf8'))
198
199         return dn
200
201     def get_mock_rodc_ctx(self):
202         if self._rodc_ctx is None:
203             admin_creds = self.get_admin_creds()
204             lp = self.get_lp()
205
206             rodc_name = self.get_new_username()
207             site_name = 'Default-First-Site-Name'
208
209             rodc_ctx = DCJoinContext(server=self.dc_host,
210                                      creds=admin_creds,
211                                      lp=lp,
212                                      site=site_name,
213                                      netbios_name=rodc_name,
214                                      targetdir=None,
215                                      domain=None)
216             self.create_rodc(rodc_ctx)
217
218             type(self)._rodc_ctx = rodc_ctx
219
220         return self._rodc_ctx
221
222     def get_domain_functional_level(self, ldb):
223         if self._functional_level is None:
224             res = ldb.search(base='',
225                              scope=SCOPE_BASE,
226                              attrs=['domainFunctionality'])
227             try:
228                 functional_level = int(res[0]['domainFunctionality'][0])
229             except KeyError:
230                 functional_level = DS_DOMAIN_FUNCTION_2000
231
232             type(self)._functional_level = functional_level
233
234         return self._functional_level
235
236     def get_default_enctypes(self):
237         samdb = self.get_samdb()
238         functional_level = self.get_domain_functional_level(samdb)
239
240         # RC4 should always be supported
241         default_enctypes = {kcrypto.Enctype.RC4}
242         if functional_level >= DS_DOMAIN_FUNCTION_2008:
243             # AES is only supported at functional level 2008 or higher
244             default_enctypes.add(kcrypto.Enctype.AES256)
245             default_enctypes.add(kcrypto.Enctype.AES128)
246
247         return default_enctypes
248
249     def create_group(self, samdb, name, ou=None):
250         if ou is None:
251             ou = samdb.get_wellknown_dn(samdb.get_default_basedn(),
252                                         DS_GUID_USERS_CONTAINER)
253
254         dn = f'CN={name},{ou}'
255
256         # Remove the group if it exists; this will happen if a previous test
257         # run failed.
258         delete_force(samdb, dn)
259
260         # Save the group name so it can be deleted in tearDownClass.
261         self.accounts.append(dn)
262
263         details = {
264             'dn': dn,
265             'objectClass': 'group'
266         }
267         samdb.add(details)
268
269         return dn
270
271     def create_account(self, samdb, name, account_type=AccountType.USER,
272                        spn=None, upn=None, additional_details=None,
273                        ou=None, account_control=0, add_dollar=True,
274                        expired_password=False):
275         '''Create an account for testing.
276            The dn of the created account is added to self.accounts,
277            which is used by tearDownClass to clean up the created accounts.
278         '''
279         if ou is None:
280             if account_type is self.AccountType.COMPUTER:
281                 guid = DS_GUID_COMPUTERS_CONTAINER
282             elif account_type is self.AccountType.SERVER:
283                 guid = DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
284             else:
285                 guid = DS_GUID_USERS_CONTAINER
286
287             ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
288
289         dn = "CN=%s,%s" % (name, ou)
290
291         # remove the account if it exists, this will happen if a previous test
292         # run failed
293         delete_force(samdb, dn)
294         account_name = name
295         if account_type is self.AccountType.USER:
296             object_class = "user"
297             account_control |= UF_NORMAL_ACCOUNT
298         else:
299             object_class = "computer"
300             if add_dollar:
301                 account_name += '$'
302             if account_type is self.AccountType.COMPUTER:
303                 account_control |= UF_WORKSTATION_TRUST_ACCOUNT
304             elif account_type is self.AccountType.SERVER:
305                 account_control |= UF_SERVER_TRUST_ACCOUNT
306             else:
307                 self.fail()
308
309         password = generate_random_password(32, 32)
310         utf16pw = ('"%s"' % password).encode('utf-16-le')
311
312         details = {
313             "dn": dn,
314             "objectclass": object_class,
315             "sAMAccountName": account_name,
316             "userAccountControl": str(account_control),
317             "unicodePwd": utf16pw}
318         if upn is not None:
319             upn = upn.format(account=account_name)
320         if spn is not None:
321             if isinstance(spn, str):
322                 spn = spn.format(account=account_name)
323             else:
324                 spn = tuple(s.format(account=account_name) for s in spn)
325             details["servicePrincipalName"] = spn
326         if upn is not None:
327             details["userPrincipalName"] = upn
328         if expired_password:
329             details["pwdLastSet"] = "0"
330         if additional_details is not None:
331             details.update(additional_details)
332         # Save the account name so it can be deleted in tearDownClass
333         self.accounts.append(dn)
334         samdb.add(details)
335
336         creds = KerberosCredentials()
337         creds.guess(self.get_lp())
338         creds.set_realm(samdb.domain_dns_name().upper())
339         creds.set_domain(samdb.domain_netbios_name().upper())
340         creds.set_password(password)
341         creds.set_username(account_name)
342         if account_type is self.AccountType.USER:
343             creds.set_workstation('')
344         else:
345             creds.set_workstation(name)
346         creds.set_dn(ldb.Dn(samdb, dn))
347         creds.set_upn(upn)
348         creds.set_spn(spn)
349
350         self.creds_set_enctypes(creds)
351
352         res = samdb.search(base=dn,
353                            scope=ldb.SCOPE_BASE,
354                            attrs=['msDS-KeyVersionNumber'])
355         kvno = res[0].get('msDS-KeyVersionNumber', idx=0)
356         if kvno is not None:
357             self.assertEqual(int(kvno), 1)
358         creds.set_kvno(1)
359
360         return (creds, dn)
361
362     def get_security_descriptor(self, dn):
363         samdb = self.get_samdb()
364
365         sid = self.get_objectSid(samdb, dn)
366
367         owner_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
368
369         ace = security.ace()
370         ace.access_mask = security.SEC_ADS_CONTROL_ACCESS
371
372         ace.trustee = security.dom_sid(sid)
373
374         dacl = security.acl()
375         dacl.revision = security.SECURITY_ACL_REVISION_ADS
376         dacl.aces = [ace]
377         dacl.num_aces = 1
378
379         security_desc = security.descriptor()
380         security_desc.type |= security.SEC_DESC_DACL_PRESENT
381         security_desc.owner_sid = owner_sid
382         security_desc.dacl = dacl
383
384         return ndr_pack(security_desc)
385
386     def create_rodc(self, ctx):
387         ctx.nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
388         ctx.full_nc_list = [ctx.base_dn, ctx.config_dn, ctx.schema_dn]
389         ctx.krbtgt_dn = f'CN=krbtgt_{ctx.myname},CN=Users,{ctx.base_dn}'
390
391         ctx.never_reveal_sid = [f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_DENY}>',
392                                 f'<SID={security.SID_BUILTIN_ADMINISTRATORS}>',
393                                 f'<SID={security.SID_BUILTIN_SERVER_OPERATORS}>',
394                                 f'<SID={security.SID_BUILTIN_BACKUP_OPERATORS}>',
395                                 f'<SID={security.SID_BUILTIN_ACCOUNT_OPERATORS}>']
396         ctx.reveal_sid = f'<SID={ctx.domsid}-{security.DOMAIN_RID_RODC_ALLOW}>'
397
398         mysid = ctx.get_mysid()
399         admin_dn = f'<SID={mysid}>'
400         ctx.managedby = admin_dn
401
402         ctx.userAccountControl = (UF_WORKSTATION_TRUST_ACCOUNT |
403                                   UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
404                                   UF_PARTIAL_SECRETS_ACCOUNT)
405
406         ctx.connection_dn = f'CN=RODC Connection (FRS),{ctx.ntds_dn}'
407         ctx.secure_channel_type = misc.SEC_CHAN_RODC
408         ctx.RODC = True
409         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_INIT_SYNC |
410                              drsuapi.DRSUAPI_DRS_PER_SYNC |
411                              drsuapi.DRSUAPI_DRS_GET_ANC |
412                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
413                              drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
414         ctx.domain_replica_flags = ctx.replica_flags | drsuapi.DRSUAPI_DRS_CRITICAL_ONLY
415
416         ctx.build_nc_lists()
417
418         ctx.cleanup_old_join()
419
420         try:
421             ctx.join_add_objects()
422         except Exception:
423             # cleanup the failed join (checking we still have a live LDB
424             # connection to the remote DC first)
425             ctx.refresh_ldb_connection()
426             ctx.cleanup_old_join()
427             raise
428
429     def replicate_account_to_rodc(self, dn):
430         samdb = self.get_samdb()
431         rodc_samdb = self.get_rodc_samdb()
432
433         repl_val = f'{samdb.get_dsServiceName()}:{dn}:SECRETS_ONLY'
434
435         msg = ldb.Message()
436         msg.dn = ldb.Dn(rodc_samdb, '')
437         msg['replicateSingleObject'] = ldb.MessageElement(
438             repl_val,
439             ldb.FLAG_MOD_REPLACE,
440             'replicateSingleObject')
441
442         try:
443             # Try replication using the replicateSingleObject rootDSE
444             # operation.
445             rodc_samdb.modify(msg)
446         except ldb.LdbError as err:
447             enum, estr = err.args
448             self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
449             self.assertIn('rootdse_modify: unknown attribute to change!',
450                           estr)
451
452             # If that method wasn't supported, we may be in the rodc:local test
453             # environment, where we can try replicating to the local database.
454
455             lp = self.get_lp()
456
457             rodc_creds = Credentials()
458             rodc_creds.guess(lp)
459             rodc_creds.set_machine_account(lp)
460
461             local_samdb = SamDB(url=None, session_info=system_session(),
462                                 credentials=rodc_creds, lp=lp)
463
464             destination_dsa_guid = misc.GUID(local_samdb.get_ntds_GUID())
465
466             repl = drs_Replicate(f'ncacn_ip_tcp:{self.dc_host}[seal]',
467                                  lp, rodc_creds,
468                                  local_samdb, destination_dsa_guid)
469
470             source_dsa_invocation_id = misc.GUID(samdb.invocation_id)
471
472             repl.replicate(dn,
473                            source_dsa_invocation_id,
474                            destination_dsa_guid,
475                            exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
476                            rodc=True)
477
478     def reveal_account_to_mock_rodc(self, dn):
479         samdb = self.get_samdb()
480         rodc_ctx = self.get_mock_rodc_ctx()
481
482         self.get_secrets(
483             samdb,
484             dn,
485             destination_dsa_guid=rodc_ctx.ntds_guid,
486             source_dsa_invocation_id=misc.GUID(samdb.invocation_id))
487
488     def check_revealed(self, dn, rodc_dn, revealed=True):
489         samdb = self.get_samdb()
490
491         res = samdb.search(base=rodc_dn,
492                            scope=ldb.SCOPE_BASE,
493                            attrs=['msDS-RevealedUsers'])
494
495         revealed_users = res[0].get('msDS-RevealedUsers')
496         if revealed_users is None:
497             self.assertFalse(revealed)
498             return
499
500         revealed_dns = set(str(dsdb_Dn(samdb, str(user),
501                                        syntax_oid=DSDB_SYNTAX_BINARY_DN).dn)
502                            for user in revealed_users)
503
504         if revealed:
505             self.assertIn(str(dn), revealed_dns)
506         else:
507             self.assertNotIn(str(dn), revealed_dns)
508
509     def get_secrets(self, samdb, dn,
510                     destination_dsa_guid,
511                     source_dsa_invocation_id):
512         admin_creds = self.get_admin_creds()
513
514         dns_hostname = samdb.host_dns_name()
515         (bind, handle, _) = drsuapi_connect(dns_hostname,
516                                             self.get_lp(),
517                                             admin_creds)
518
519         req = drsuapi.DsGetNCChangesRequest8()
520
521         req.destination_dsa_guid = destination_dsa_guid
522         req.source_dsa_invocation_id = source_dsa_invocation_id
523
524         naming_context = drsuapi.DsReplicaObjectIdentifier()
525         naming_context.dn = dn
526
527         req.naming_context = naming_context
528
529         hwm = drsuapi.DsReplicaHighWaterMark()
530         hwm.tmp_highest_usn = 0
531         hwm.reserved_usn = 0
532         hwm.highest_usn = 0
533
534         req.highwatermark = hwm
535         req.uptodateness_vector = None
536
537         req.replica_flags = 0
538
539         req.max_object_count = 1
540         req.max_ndr_size = 402116
541         req.extended_op = drsuapi.DRSUAPI_EXOP_REPL_SECRET
542
543         attids = [drsuapi.DRSUAPI_ATTID_supplementalCredentials,
544                   drsuapi.DRSUAPI_ATTID_unicodePwd,
545                   drsuapi.DRSUAPI_ATTID_ntPwdHistory]
546
547         partial_attribute_set = drsuapi.DsPartialAttributeSet()
548         partial_attribute_set.version = 1
549         partial_attribute_set.attids = attids
550         partial_attribute_set.num_attids = len(attids)
551
552         req.partial_attribute_set = partial_attribute_set
553
554         req.partial_attribute_set_ex = None
555         req.mapping_ctr.num_mappings = 0
556         req.mapping_ctr.mappings = None
557
558         _, ctr = bind.DsGetNCChanges(handle, 8, req)
559
560         self.assertEqual(1, ctr.object_count)
561
562         identifier = ctr.first_object.object.identifier
563         attributes = ctr.first_object.object.attribute_ctr.attributes
564
565         self.assertEqual(dn, identifier.dn)
566
567         return bind, identifier, attributes
568
569     def get_keys(self, samdb, dn, expected_etypes=None):
570         admin_creds = self.get_admin_creds()
571
572         bind, identifier, attributes = self.get_secrets(
573             samdb,
574             str(dn),
575             destination_dsa_guid=misc.GUID(samdb.get_ntds_GUID()),
576             source_dsa_invocation_id=misc.GUID())
577
578         rid = identifier.sid.split()[1]
579
580         net_ctx = net.Net(admin_creds)
581
582         keys = {}
583
584         for attr in attributes:
585             if attr.attid == drsuapi.DRSUAPI_ATTID_supplementalCredentials:
586                 net_ctx.replicate_decrypt(bind, attr, rid)
587                 attr_val = attr.value_ctr.values[0].blob
588
589                 spl = ndr_unpack(drsblobs.supplementalCredentialsBlob,
590                                  attr_val)
591                 for pkg in spl.sub.packages:
592                     if pkg.name == 'Primary:Kerberos-Newer-Keys':
593                         krb5_new_keys_raw = binascii.a2b_hex(pkg.data)
594                         krb5_new_keys = ndr_unpack(
595                             drsblobs.package_PrimaryKerberosBlob,
596                             krb5_new_keys_raw)
597                         for key in krb5_new_keys.ctr.keys:
598                             keytype = key.keytype
599                             if keytype in (kcrypto.Enctype.AES256,
600                                            kcrypto.Enctype.AES128):
601                                 keys[keytype] = key.value.hex()
602             elif attr.attid == drsuapi.DRSUAPI_ATTID_unicodePwd:
603                 net_ctx.replicate_decrypt(bind, attr, rid)
604                 if attr.value_ctr.num_values > 0:
605                     pwd = attr.value_ctr.values[0].blob
606                     keys[kcrypto.Enctype.RC4] = pwd.hex()
607
608         if expected_etypes is None:
609             expected_etypes = self.get_default_enctypes()
610
611         self.assertCountEqual(expected_etypes, keys)
612
613         return keys
614
615     def creds_set_keys(self, creds, keys):
616         if keys is not None:
617             for enctype, key in keys.items():
618                 creds.set_forced_key(enctype, key)
619
620     def creds_set_enctypes(self, creds):
621         samdb = self.get_samdb()
622
623         res = samdb.search(creds.get_dn(),
624                            scope=ldb.SCOPE_BASE,
625                            attrs=['msDS-SupportedEncryptionTypes'])
626         supported_enctypes = res[0].get('msDS-SupportedEncryptionTypes', idx=0)
627
628         if supported_enctypes is None:
629             supported_enctypes = 0
630
631         creds.set_as_supported_enctypes(supported_enctypes)
632         creds.set_tgs_supported_enctypes(supported_enctypes)
633         creds.set_ap_supported_enctypes(supported_enctypes)
634
635     def creds_set_default_enctypes(self, creds,
636                                    fast_support=False,
637                                    claims_support=False,
638                                    compound_id_support=False):
639         default_enctypes = self.get_default_enctypes()
640         supported_enctypes = KerberosCredentials.etypes_to_bits(
641             default_enctypes)
642
643         if fast_support:
644             supported_enctypes |= security.KERB_ENCTYPE_FAST_SUPPORTED
645         if claims_support:
646             supported_enctypes |= security.KERB_ENCTYPE_CLAIMS_SUPPORTED
647         if compound_id_support:
648             supported_enctypes |= (
649                 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED)
650
651         creds.set_as_supported_enctypes(supported_enctypes)
652         creds.set_tgs_supported_enctypes(supported_enctypes)
653         creds.set_ap_supported_enctypes(supported_enctypes)
654
655     def add_to_group(self, account_dn, group_dn, group_attr, expect_attr=True):
656         samdb = self.get_samdb()
657
658         try:
659             res = samdb.search(base=group_dn,
660                                scope=ldb.SCOPE_BASE,
661                                attrs=[group_attr])
662         except ldb.LdbError as err:
663             num, _ = err.args
664             if num != ldb.ERR_NO_SUCH_OBJECT:
665                 raise
666
667             self.fail(err)
668
669         orig_msg = res[0]
670         members = orig_msg.get(group_attr)
671         if expect_attr:
672             self.assertIsNotNone(members)
673         elif members is None:
674             members = ()
675
676         members = list(members)
677         members.append(account_dn)
678
679         msg = ldb.Message()
680         msg.dn = group_dn
681         msg[group_attr] = ldb.MessageElement(members,
682                                              ldb.FLAG_MOD_REPLACE,
683                                              group_attr)
684
685         cleanup = samdb.msg_diff(msg, orig_msg)
686         self.ldb_cleanups.append(cleanup)
687         samdb.modify(msg)
688
689         return cleanup
690
691     def remove_from_group(self, account_dn, group_dn):
692         samdb = self.get_samdb()
693
694         res = samdb.search(base=group_dn,
695                            scope=ldb.SCOPE_BASE,
696                            attrs=['member'])
697         orig_msg = res[0]
698         self.assertIn('member', orig_msg)
699         members = list(orig_msg['member'])
700
701         account_dn = str(account_dn).encode('utf-8')
702         self.assertIn(account_dn, members)
703         members.remove(account_dn)
704
705         msg = ldb.Message()
706         msg.dn = group_dn
707         msg['member'] = ldb.MessageElement(members,
708                                            ldb.FLAG_MOD_REPLACE,
709                                            'member')
710
711         cleanup = samdb.msg_diff(msg, orig_msg)
712         self.ldb_cleanups.append(cleanup)
713         samdb.modify(msg)
714
715         return cleanup
716
717     def get_cached_creds(self, *,
718                          account_type,
719                          opts=None,
720                          use_cache=True):
721         if opts is None:
722             opts = {}
723
724         opts_default = {
725             'name_prefix': None,
726             'name_suffix': None,
727             'add_dollar': True,
728             'upn': None,
729             'spn': None,
730             'additional_details': None,
731             'allowed_replication': False,
732             'allowed_replication_mock': False,
733             'denied_replication': False,
734             'denied_replication_mock': False,
735             'revealed_to_rodc': False,
736             'revealed_to_mock_rodc': False,
737             'no_auth_data_required': False,
738             'expired_password': False,
739             'supported_enctypes': None,
740             'not_delegated': False,
741             'delegation_to_spn': None,
742             'delegation_from_dn': None,
743             'trusted_to_auth_for_delegation': False,
744             'fast_support': False,
745             'member_of': None,
746             'kerberos_enabled': True,
747             'secure_channel_type': None,
748             'id': None
749         }
750
751         account_opts = {
752             'account_type': account_type,
753             **opts_default,
754             **opts
755         }
756
757         cache_key = tuple(sorted(account_opts.items()))
758
759         if use_cache:
760             creds = self.account_cache.get(cache_key)
761             if creds is not None:
762                 return creds
763
764         creds = self.create_account_opts(**account_opts)
765         if use_cache:
766             self.account_cache[cache_key] = creds
767
768         return creds
769
770     def create_account_opts(self, *,
771                             account_type,
772                             name_prefix,
773                             name_suffix,
774                             add_dollar,
775                             upn,
776                             spn,
777                             additional_details,
778                             allowed_replication,
779                             allowed_replication_mock,
780                             denied_replication,
781                             denied_replication_mock,
782                             revealed_to_rodc,
783                             revealed_to_mock_rodc,
784                             no_auth_data_required,
785                             expired_password,
786                             supported_enctypes,
787                             not_delegated,
788                             delegation_to_spn,
789                             delegation_from_dn,
790                             trusted_to_auth_for_delegation,
791                             fast_support,
792                             member_of,
793                             kerberos_enabled,
794                             secure_channel_type,
795                             id):
796         if account_type is self.AccountType.USER:
797             self.assertIsNone(spn)
798             self.assertIsNone(delegation_to_spn)
799             self.assertIsNone(delegation_from_dn)
800             self.assertFalse(trusted_to_auth_for_delegation)
801         else:
802             self.assertFalse(not_delegated)
803
804         samdb = self.get_samdb()
805
806         user_name = self.get_new_username()
807         if name_prefix is not None:
808             user_name = name_prefix + user_name
809         if name_suffix is not None:
810             user_name += name_suffix
811
812         user_account_control = 0
813         if trusted_to_auth_for_delegation:
814             user_account_control |= UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION
815         if not_delegated:
816             user_account_control |= UF_NOT_DELEGATED
817         if no_auth_data_required:
818             user_account_control |= UF_NO_AUTH_DATA_REQUIRED
819
820         if additional_details:
821             details = {k: v for k, v in additional_details}
822         else:
823             details = {}
824
825         enctypes = supported_enctypes
826         if fast_support:
827             enctypes = enctypes or 0
828             enctypes |= KerberosCredentials.fast_supported_bits
829
830         if enctypes is not None:
831             details['msDS-SupportedEncryptionTypes'] = str(enctypes)
832
833         if delegation_to_spn:
834             details['msDS-AllowedToDelegateTo'] = delegation_to_spn
835
836         if delegation_from_dn:
837             security_descriptor = self.get_security_descriptor(
838                 delegation_from_dn)
839             details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
840                 security_descriptor)
841
842         if spn is None and account_type is not self.AccountType.USER:
843             spn = 'host/' + user_name
844
845         creds, dn = self.create_account(samdb, user_name,
846                                         account_type=account_type,
847                                         upn=upn,
848                                         spn=spn,
849                                         additional_details=details,
850                                         account_control=user_account_control,
851                                         add_dollar=add_dollar,
852                                         expired_password=expired_password)
853
854         keys = self.get_keys(samdb, dn)
855         self.creds_set_keys(creds, keys)
856
857         # Handle secret replication to the RODC.
858
859         if allowed_replication or revealed_to_rodc:
860             rodc_samdb = self.get_rodc_samdb()
861             rodc_dn = self.get_server_dn(rodc_samdb)
862
863             # Allow replicating this account's secrets if requested, or allow
864             # it only temporarily if we're about to replicate them.
865             allowed_cleanup = self.add_to_group(
866                 dn, rodc_dn,
867                 'msDS-RevealOnDemandGroup')
868
869             if revealed_to_rodc:
870                 # Replicate this account's secrets to the RODC.
871                 self.replicate_account_to_rodc(dn)
872
873             if not allowed_replication:
874                 # If we don't want replicating secrets to be allowed for this
875                 # account, disable it again.
876                 samdb.modify(allowed_cleanup)
877
878             self.check_revealed(dn,
879                                 rodc_dn,
880                                 revealed=revealed_to_rodc)
881
882         if denied_replication:
883             rodc_samdb = self.get_rodc_samdb()
884             rodc_dn = self.get_server_dn(rodc_samdb)
885
886             # Deny replicating this account's secrets to the RODC.
887             self.add_to_group(dn, rodc_dn, 'msDS-NeverRevealGroup')
888
889         # Handle secret replication to the mock RODC.
890
891         if allowed_replication_mock or revealed_to_mock_rodc:
892             # Allow replicating this account's secrets if requested, or allow
893             # it only temporarily if we want to add the account to the mock
894             # RODC's msDS-RevealedUsers.
895             rodc_ctx = self.get_mock_rodc_ctx()
896             mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
897
898             allowed_mock_cleanup = self.add_to_group(
899                 dn, mock_rodc_dn,
900                 'msDS-RevealOnDemandGroup')
901
902             if revealed_to_mock_rodc:
903                 # Request replicating this account's secrets to the mock RODC,
904                 # which updates msDS-RevealedUsers.
905                 self.reveal_account_to_mock_rodc(dn)
906
907             if not allowed_replication_mock:
908                 # If we don't want replicating secrets to be allowed for this
909                 # account, disable it again.
910                 samdb.modify(allowed_mock_cleanup)
911
912             self.check_revealed(dn,
913                                 mock_rodc_dn,
914                                 revealed=revealed_to_mock_rodc)
915
916         if denied_replication_mock:
917             # Deny replicating this account's secrets to the mock RODC.
918             rodc_ctx = self.get_mock_rodc_ctx()
919             mock_rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
920
921             self.add_to_group(dn, mock_rodc_dn, 'msDS-NeverRevealGroup')
922
923         if member_of is not None:
924             for group_dn in member_of:
925                 self.add_to_group(dn, ldb.Dn(samdb, group_dn), 'member',
926                                   expect_attr=False)
927
928         if kerberos_enabled:
929             creds.set_kerberos_state(MUST_USE_KERBEROS)
930         else:
931             creds.set_kerberos_state(DONT_USE_KERBEROS)
932
933         if secure_channel_type is not None:
934             creds.set_secure_channel_type(secure_channel_type)
935
936         return creds
937
938     def get_new_username(self):
939         user_name = self.account_base + str(self.account_id)
940         type(self).account_id += 1
941
942         return user_name
943
944     def get_client_creds(self,
945                          allow_missing_password=False,
946                          allow_missing_keys=True):
947         def create_client_account():
948             return self.get_cached_creds(account_type=self.AccountType.USER)
949
950         c = self._get_krb5_creds(prefix='CLIENT',
951                                  allow_missing_password=allow_missing_password,
952                                  allow_missing_keys=allow_missing_keys,
953                                  fallback_creds_fn=create_client_account)
954         return c
955
956     def get_mach_creds(self,
957                        allow_missing_password=False,
958                        allow_missing_keys=True):
959         def create_mach_account():
960             return self.get_cached_creds(account_type=self.AccountType.COMPUTER,
961                                          opts={'fast_support': True})
962
963         c = self._get_krb5_creds(prefix='MAC',
964                                  allow_missing_password=allow_missing_password,
965                                  allow_missing_keys=allow_missing_keys,
966                                  fallback_creds_fn=create_mach_account)
967         return c
968
969     def get_service_creds(self,
970                           allow_missing_password=False,
971                           allow_missing_keys=True):
972         def create_service_account():
973             return self.get_cached_creds(
974                 account_type=self.AccountType.COMPUTER,
975                 opts={
976                     'trusted_to_auth_for_delegation': True,
977                     'fast_support': True
978                 })
979
980         c = self._get_krb5_creds(prefix='SERVICE',
981                                  allow_missing_password=allow_missing_password,
982                                  allow_missing_keys=allow_missing_keys,
983                                  fallback_creds_fn=create_service_account)
984         return c
985
986     def get_rodc_krbtgt_creds(self,
987                               require_keys=True,
988                               require_strongest_key=False):
989         if require_strongest_key:
990             self.assertTrue(require_keys)
991
992         def download_rodc_krbtgt_creds():
993             samdb = self.get_samdb()
994             rodc_samdb = self.get_rodc_samdb()
995
996             rodc_dn = self.get_server_dn(rodc_samdb)
997
998             res = samdb.search(rodc_dn,
999                                scope=ldb.SCOPE_BASE,
1000                                attrs=['msDS-KrbTgtLink'])
1001             krbtgt_dn = res[0]['msDS-KrbTgtLink'][0]
1002
1003             res = samdb.search(krbtgt_dn,
1004                                scope=ldb.SCOPE_BASE,
1005                                attrs=['sAMAccountName',
1006                                       'msDS-KeyVersionNumber',
1007                                       'msDS-SecondaryKrbTgtNumber'])
1008             krbtgt_dn = res[0].dn
1009             username = str(res[0]['sAMAccountName'])
1010
1011             creds = KerberosCredentials()
1012             creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
1013             creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
1014             creds.set_username(username)
1015
1016             kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1017             krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
1018
1019             rodc_kvno = krbtgt_number << 16 | kvno
1020             creds.set_kvno(rodc_kvno)
1021             creds.set_dn(krbtgt_dn)
1022
1023             keys = self.get_keys(samdb, krbtgt_dn)
1024             self.creds_set_keys(creds, keys)
1025
1026             # The RODC krbtgt account should support the default enctypes,
1027             # although it might not have the msDS-SupportedEncryptionTypes
1028             # attribute.
1029             self.creds_set_default_enctypes(
1030                 creds,
1031                 fast_support=self.kdc_fast_support,
1032                 claims_support=self.kdc_claims_support,
1033                 compound_id_support=self.kdc_compound_id_support)
1034
1035             return creds
1036
1037         c = self._get_krb5_creds(prefix='RODC_KRBTGT',
1038                                  allow_missing_password=True,
1039                                  allow_missing_keys=not require_keys,
1040                                  require_strongest_key=require_strongest_key,
1041                                  fallback_creds_fn=download_rodc_krbtgt_creds)
1042         return c
1043
1044     def get_mock_rodc_krbtgt_creds(self,
1045                                    require_keys=True,
1046                                    require_strongest_key=False):
1047         if require_strongest_key:
1048             self.assertTrue(require_keys)
1049
1050         def create_rodc_krbtgt_account():
1051             samdb = self.get_samdb()
1052
1053             rodc_ctx = self.get_mock_rodc_ctx()
1054
1055             krbtgt_dn = rodc_ctx.new_krbtgt_dn
1056
1057             res = samdb.search(base=ldb.Dn(samdb, krbtgt_dn),
1058                                scope=ldb.SCOPE_BASE,
1059                                attrs=['msDS-KeyVersionNumber',
1060                                       'msDS-SecondaryKrbTgtNumber'])
1061             dn = res[0].dn
1062             username = str(rodc_ctx.krbtgt_name)
1063
1064             creds = KerberosCredentials()
1065             creds.set_domain(self.env_get_var('DOMAIN', 'RODC_KRBTGT'))
1066             creds.set_realm(self.env_get_var('REALM', 'RODC_KRBTGT'))
1067             creds.set_username(username)
1068
1069             kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1070             krbtgt_number = int(res[0]['msDS-SecondaryKrbTgtNumber'][0])
1071
1072             rodc_kvno = krbtgt_number << 16 | kvno
1073             creds.set_kvno(rodc_kvno)
1074             creds.set_dn(dn)
1075
1076             keys = self.get_keys(samdb, dn)
1077             self.creds_set_keys(creds, keys)
1078
1079             self.creds_set_enctypes(creds)
1080
1081             return creds
1082
1083         c = self._get_krb5_creds(prefix='MOCK_RODC_KRBTGT',
1084                                  allow_missing_password=True,
1085                                  allow_missing_keys=not require_keys,
1086                                  require_strongest_key=require_strongest_key,
1087                                  fallback_creds_fn=create_rodc_krbtgt_account)
1088         return c
1089
1090     def get_krbtgt_creds(self,
1091                          require_keys=True,
1092                          require_strongest_key=False):
1093         if require_strongest_key:
1094             self.assertTrue(require_keys)
1095
1096         def download_krbtgt_creds():
1097             samdb = self.get_samdb()
1098
1099             krbtgt_rid = security.DOMAIN_RID_KRBTGT
1100             krbtgt_sid = '%s-%d' % (samdb.get_domain_sid(), krbtgt_rid)
1101
1102             res = samdb.search(base='<SID=%s>' % krbtgt_sid,
1103                                scope=ldb.SCOPE_BASE,
1104                                attrs=['sAMAccountName',
1105                                       'msDS-KeyVersionNumber'])
1106             dn = res[0].dn
1107             username = str(res[0]['sAMAccountName'])
1108
1109             creds = KerberosCredentials()
1110             creds.set_domain(self.env_get_var('DOMAIN', 'KRBTGT'))
1111             creds.set_realm(self.env_get_var('REALM', 'KRBTGT'))
1112             creds.set_username(username)
1113
1114             kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1115             creds.set_kvno(kvno)
1116             creds.set_dn(dn)
1117
1118             keys = self.get_keys(samdb, dn)
1119             self.creds_set_keys(creds, keys)
1120
1121             # The krbtgt account should support the default enctypes, although
1122             # it might not (on Samba) have the msDS-SupportedEncryptionTypes
1123             # attribute.
1124             self.creds_set_default_enctypes(
1125                 creds,
1126                 fast_support=self.kdc_fast_support,
1127                 claims_support=self.kdc_claims_support,
1128                 compound_id_support=self.kdc_compound_id_support)
1129
1130             return creds
1131
1132         c = self._get_krb5_creds(prefix='KRBTGT',
1133                                  default_username='krbtgt',
1134                                  allow_missing_password=True,
1135                                  allow_missing_keys=not require_keys,
1136                                  require_strongest_key=require_strongest_key,
1137                                  fallback_creds_fn=download_krbtgt_creds)
1138         return c
1139
1140     def get_dc_creds(self,
1141                      require_keys=True,
1142                      require_strongest_key=False):
1143         if require_strongest_key:
1144             self.assertTrue(require_keys)
1145
1146         def download_dc_creds():
1147             samdb = self.get_samdb()
1148
1149             dc_rid = 1000
1150             dc_sid = '%s-%d' % (samdb.get_domain_sid(), dc_rid)
1151
1152             res = samdb.search(base='<SID=%s>' % dc_sid,
1153                                scope=ldb.SCOPE_BASE,
1154                                attrs=['sAMAccountName',
1155                                       'msDS-KeyVersionNumber'])
1156             dn = res[0].dn
1157             username = str(res[0]['sAMAccountName'])
1158
1159             creds = KerberosCredentials()
1160             creds.set_domain(self.env_get_var('DOMAIN', 'DC'))
1161             creds.set_realm(self.env_get_var('REALM', 'DC'))
1162             creds.set_username(username)
1163
1164             kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1165             creds.set_kvno(kvno)
1166             creds.set_workstation(username[:-1])
1167             creds.set_dn(dn)
1168
1169             keys = self.get_keys(samdb, dn)
1170             self.creds_set_keys(creds, keys)
1171
1172             self.creds_set_enctypes(creds)
1173
1174             return creds
1175
1176         c = self._get_krb5_creds(prefix='DC',
1177                                  allow_missing_password=True,
1178                                  allow_missing_keys=not require_keys,
1179                                  require_strongest_key=require_strongest_key,
1180                                  fallback_creds_fn=download_dc_creds)
1181         return c
1182
1183     def get_server_creds(self,
1184                      require_keys=True,
1185                      require_strongest_key=False):
1186         if require_strongest_key:
1187             self.assertTrue(require_keys)
1188
1189         def download_server_creds():
1190             samdb = self.get_samdb()
1191
1192             res = samdb.search(base=samdb.get_default_basedn(),
1193                                expression=(f'(|(sAMAccountName={self.host}*)'
1194                                            f'(dNSHostName={self.host}))'),
1195                                scope=ldb.SCOPE_SUBTREE,
1196                                attrs=['sAMAccountName',
1197                                       'msDS-KeyVersionNumber'])
1198             self.assertEqual(1, len(res))
1199             dn = res[0].dn
1200             username = str(res[0]['sAMAccountName'])
1201
1202             creds = KerberosCredentials()
1203             creds.set_domain(self.env_get_var('DOMAIN', 'SERVER'))
1204             creds.set_realm(self.env_get_var('REALM', 'SERVER'))
1205             creds.set_username(username)
1206
1207             kvno = int(res[0]['msDS-KeyVersionNumber'][0])
1208             creds.set_kvno(kvno)
1209             creds.set_dn(dn)
1210
1211             keys = self.get_keys(samdb, dn)
1212             self.creds_set_keys(creds, keys)
1213
1214             self.creds_set_enctypes(creds)
1215
1216             return creds
1217
1218         c = self._get_krb5_creds(prefix='SERVER',
1219                                  allow_missing_password=True,
1220                                  allow_missing_keys=not require_keys,
1221                                  require_strongest_key=require_strongest_key,
1222                                  fallback_creds_fn=download_server_creds)
1223         return c
1224
1225     def as_req(self, cname, sname, realm, etypes, padata=None, kdc_options=0):
1226         '''Send a Kerberos AS_REQ, returns the undecoded response
1227         '''
1228
1229         till = self.get_KerberosTime(offset=36000)
1230
1231         req = self.AS_REQ_create(padata=padata,
1232                                  kdc_options=str(kdc_options),
1233                                  cname=cname,
1234                                  realm=realm,
1235                                  sname=sname,
1236                                  from_time=None,
1237                                  till_time=till,
1238                                  renew_time=None,
1239                                  nonce=0x7fffffff,
1240                                  etypes=etypes,
1241                                  addresses=None,
1242                                  additional_tickets=None)
1243         rep = self.send_recv_transaction(req)
1244         return rep
1245
1246     def get_as_rep_key(self, creds, rep):
1247         '''Extract the session key from an AS-REP
1248         '''
1249         rep_padata = self.der_decode(
1250             rep['e-data'],
1251             asn1Spec=krb5_asn1.METHOD_DATA())
1252
1253         for pa in rep_padata:
1254             if pa['padata-type'] == PADATA_ETYPE_INFO2:
1255                 padata_value = pa['padata-value']
1256                 break
1257
1258         etype_info2 = self.der_decode(
1259             padata_value, asn1Spec=krb5_asn1.ETYPE_INFO2())
1260
1261         key = self.PasswordKey_from_etype_info2(creds, etype_info2[0],
1262                                                 creds.get_kvno())
1263         return key
1264
1265     def get_enc_timestamp_pa_data(self, creds, rep, skew=0):
1266         '''generate the pa_data data element for an AS-REQ
1267         '''
1268
1269         key = self.get_as_rep_key(creds, rep)
1270
1271         return self.get_enc_timestamp_pa_data_from_key(key, skew=skew)
1272
1273     def get_enc_timestamp_pa_data_from_key(self, key, skew=0):
1274         (patime, pausec) = self.get_KerberosTimeWithUsec(offset=skew)
1275         padata = self.PA_ENC_TS_ENC_create(patime, pausec)
1276         padata = self.der_encode(padata, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
1277
1278         padata = self.EncryptedData_create(key, KU_PA_ENC_TIMESTAMP, padata)
1279         padata = self.der_encode(padata, asn1Spec=krb5_asn1.EncryptedData())
1280
1281         padata = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, padata)
1282
1283         return padata
1284
1285     def get_challenge_pa_data(self, client_challenge_key, skew=0):
1286         patime, pausec = self.get_KerberosTimeWithUsec(offset=skew)
1287         padata = self.PA_ENC_TS_ENC_create(patime, pausec)
1288         padata = self.der_encode(padata,
1289                                  asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
1290
1291         padata = self.EncryptedData_create(client_challenge_key,
1292                                            KU_ENC_CHALLENGE_CLIENT,
1293                                            padata)
1294         padata = self.der_encode(padata,
1295                                  asn1Spec=krb5_asn1.EncryptedData())
1296
1297         padata = self.PA_DATA_create(PADATA_ENCRYPTED_CHALLENGE,
1298                                      padata)
1299
1300         return padata
1301
1302     def get_as_rep_enc_data(self, key, rep):
1303         ''' Decrypt and Decode the encrypted data in an AS-REP
1304         '''
1305         enc_part = key.decrypt(KU_AS_REP_ENC_PART, rep['enc-part']['cipher'])
1306         # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
1307         # application tag 26
1308         try:
1309             enc_part = self.der_decode(
1310                 enc_part, asn1Spec=krb5_asn1.EncASRepPart())
1311         except Exception:
1312             enc_part = self.der_decode(
1313                 enc_part, asn1Spec=krb5_asn1.EncTGSRepPart())
1314
1315         return enc_part
1316
1317     def check_pre_authentication(self, rep):
1318         """ Check that the kdc response was pre-authentication required
1319         """
1320         self.check_error_rep(rep, KDC_ERR_PREAUTH_REQUIRED)
1321
1322     def check_as_reply(self, rep):
1323         """ Check that the kdc response is an AS-REP and that the
1324             values for:
1325                 msg-type
1326                 pvno
1327                 tkt-pvno
1328                 kvno
1329             match the expected values
1330         """
1331         self.check_reply(rep, msg_type=KRB_AS_REP)
1332
1333     def check_tgs_reply(self, rep):
1334         """ Check that the kdc response is an TGS-REP and that the
1335             values for:
1336                 msg-type
1337                 pvno
1338                 tkt-pvno
1339                 kvno
1340             match the expected values
1341         """
1342         self.check_reply(rep, msg_type=KRB_TGS_REP)
1343
1344     def check_reply(self, rep, msg_type):
1345
1346         # Should have a reply, and it should an TGS-REP message.
1347         self.assertIsNotNone(rep)
1348         self.assertEqual(rep['msg-type'], msg_type, "rep = {%s}" % rep)
1349
1350         # Protocol version number should be 5
1351         pvno = int(rep['pvno'])
1352         self.assertEqual(5, pvno, "rep = {%s}" % rep)
1353
1354         # The ticket version number should be 5
1355         tkt_vno = int(rep['ticket']['tkt-vno'])
1356         self.assertEqual(5, tkt_vno, "rep = {%s}" % rep)
1357
1358         # Check that the kvno is not an RODC kvno
1359         # MIT kerberos does not provide the kvno, so we treat it as optional.
1360         # This is tested in compatability_test.py
1361         if 'kvno' in rep['enc-part']:
1362             kvno = int(rep['enc-part']['kvno'])
1363             # If the high order bits are set this is an RODC kvno.
1364             self.assertEqual(0, kvno & 0xFFFF0000, "rep = {%s}" % rep)
1365
1366     def check_error_rep(self, rep, expected):
1367         """ Check that the reply is an error message, with the expected
1368             error-code specified.
1369         """
1370         self.assertIsNotNone(rep)
1371         self.assertEqual(rep['msg-type'], KRB_ERROR, "rep = {%s}" % rep)
1372         if isinstance(expected, collections.abc.Container):
1373             self.assertIn(rep['error-code'], expected, "rep = {%s}" % rep)
1374         else:
1375             self.assertEqual(rep['error-code'], expected, "rep = {%s}" % rep)
1376
1377     def tgs_req(self, cname, sname, realm, ticket, key, etypes,
1378                 expected_error_mode=0, padata=None, kdc_options=0,
1379                 to_rodc=False, service_creds=None, expect_pac=True,
1380                 expect_edata=None, expected_flags=None, unexpected_flags=None):
1381         '''Send a TGS-REQ, returns the response and the decrypted and
1382            decoded enc-part
1383         '''
1384
1385         subkey = self.RandomKey(key.etype)
1386
1387         (ctime, cusec) = self.get_KerberosTimeWithUsec()
1388
1389         tgt = KerberosTicketCreds(ticket,
1390                                   key,
1391                                   crealm=realm,
1392                                   cname=cname)
1393
1394         if service_creds is not None:
1395             decryption_key = self.TicketDecryptionKey_from_creds(
1396                 service_creds)
1397             expected_supported_etypes = service_creds.tgs_supported_enctypes
1398         else:
1399             decryption_key = None
1400             expected_supported_etypes = None
1401
1402         if not expected_error_mode:
1403             check_error_fn = None
1404             check_rep_fn = self.generic_check_kdc_rep
1405         else:
1406             check_error_fn = self.generic_check_kdc_error
1407             check_rep_fn = None
1408
1409         def generate_padata(_kdc_exchange_dict,
1410                             _callback_dict,
1411                             req_body):
1412
1413             return padata, req_body
1414
1415         kdc_exchange_dict = self.tgs_exchange_dict(
1416             expected_crealm=realm,
1417             expected_cname=cname,
1418             expected_srealm=realm,
1419             expected_sname=sname,
1420             expected_error_mode=expected_error_mode,
1421             expected_flags=expected_flags,
1422             unexpected_flags=unexpected_flags,
1423             expected_supported_etypes=expected_supported_etypes,
1424             check_error_fn=check_error_fn,
1425             check_rep_fn=check_rep_fn,
1426             check_kdc_private_fn=self.generic_check_kdc_private,
1427             ticket_decryption_key=decryption_key,
1428             generate_padata_fn=generate_padata if padata is not None else None,
1429             tgt=tgt,
1430             authenticator_subkey=subkey,
1431             kdc_options=str(kdc_options),
1432             expect_edata=expect_edata,
1433             expect_pac=expect_pac,
1434             to_rodc=to_rodc)
1435
1436         rep = self._generic_kdc_exchange(kdc_exchange_dict,
1437                                          cname=None,
1438                                          realm=realm,
1439                                          sname=sname,
1440                                          etypes=etypes)
1441
1442         if expected_error_mode:
1443             enc_part = None
1444         else:
1445             ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1446             enc_part = ticket_creds.encpart_private
1447
1448         return rep, enc_part
1449
1450     def get_service_ticket(self, tgt, target_creds, service='host',
1451                            target_name=None, till=None, rc4_support=True,
1452                            to_rodc=False, kdc_options=None,
1453                            expected_flags=None, unexpected_flags=None,
1454                            pac_request=True, expect_pac=True, fresh=False):
1455         user_name = tgt.cname['name-string'][0]
1456         ticket_sname = tgt.sname
1457         if target_name is None:
1458             target_name = target_creds.get_username()[:-1]
1459         cache_key = (user_name, target_name, service, to_rodc, kdc_options,
1460                      pac_request, str(expected_flags), str(unexpected_flags),
1461                      till, rc4_support,
1462                      str(ticket_sname),
1463                      expect_pac)
1464
1465         if not fresh:
1466             ticket = self.tkt_cache.get(cache_key)
1467
1468             if ticket is not None:
1469                 return ticket
1470
1471         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1472
1473         if kdc_options is None:
1474             kdc_options = '0'
1475         kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
1476
1477         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1478                                           names=[service, target_name])
1479         srealm = target_creds.get_realm()
1480
1481         authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
1482
1483         decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
1484
1485         kdc_exchange_dict = self.tgs_exchange_dict(
1486             expected_crealm=tgt.crealm,
1487             expected_cname=tgt.cname,
1488             expected_srealm=srealm,
1489             expected_sname=sname,
1490             expected_supported_etypes=target_creds.tgs_supported_enctypes,
1491             expected_flags=expected_flags,
1492             unexpected_flags=unexpected_flags,
1493             ticket_decryption_key=decryption_key,
1494             check_rep_fn=self.generic_check_kdc_rep,
1495             check_kdc_private_fn=self.generic_check_kdc_private,
1496             tgt=tgt,
1497             authenticator_subkey=authenticator_subkey,
1498             kdc_options=kdc_options,
1499             pac_request=pac_request,
1500             expect_pac=expect_pac,
1501             rc4_support=rc4_support,
1502             to_rodc=to_rodc)
1503
1504         rep = self._generic_kdc_exchange(kdc_exchange_dict,
1505                                          cname=None,
1506                                          realm=srealm,
1507                                          sname=sname,
1508                                          till_time=till,
1509                                          etypes=etype)
1510         self.check_tgs_reply(rep)
1511
1512         service_ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1513
1514         if to_rodc:
1515             krbtgt_creds = self.get_rodc_krbtgt_creds()
1516         else:
1517             krbtgt_creds = self.get_krbtgt_creds()
1518         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1519         self.verify_ticket(service_ticket_creds, krbtgt_key,
1520                            service_ticket=True, expect_pac=expect_pac,
1521                            expect_ticket_checksum=self.tkt_sig_support)
1522
1523         self.tkt_cache[cache_key] = service_ticket_creds
1524
1525         return service_ticket_creds
1526
1527     def get_tgt(self, creds, to_rodc=False, kdc_options=None,
1528                 client_account=None, client_name_type=NT_PRINCIPAL,
1529                 expected_flags=None, unexpected_flags=None,
1530                 expected_account_name=None, expected_upn_name=None,
1531                 expected_cname=None,
1532                 expected_sid=None,
1533                 sname=None, realm=None,
1534                 pac_request=True, expect_pac=True,
1535                 expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
1536                 expect_requester_sid=None,
1537                 rc4_support=True,
1538                 fresh=False):
1539         if client_account is not None:
1540             user_name = client_account
1541         else:
1542             user_name = creds.get_username()
1543
1544         cache_key = (user_name, to_rodc, kdc_options, pac_request,
1545                      client_name_type,
1546                      str(expected_flags), str(unexpected_flags),
1547                      expected_account_name, expected_upn_name, expected_sid,
1548                      str(sname), str(realm),
1549                      str(expected_cname),
1550                      rc4_support,
1551                      expect_pac, expect_pac_attrs,
1552                      expect_pac_attrs_pac_request, expect_requester_sid)
1553
1554         if not fresh:
1555             tgt = self.tkt_cache.get(cache_key)
1556
1557             if tgt is not None:
1558                 return tgt
1559
1560         if realm is None:
1561             realm = creds.get_realm()
1562
1563         salt = creds.get_salt()
1564
1565         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1566         cname = self.PrincipalName_create(name_type=client_name_type,
1567                                           names=user_name.split('/'))
1568         if sname is None:
1569             sname = self.PrincipalName_create(name_type=NT_SRV_INST,
1570                                               names=['krbtgt', realm])
1571             expected_sname = self.PrincipalName_create(
1572                 name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
1573         else:
1574             expected_sname = sname
1575
1576         if expected_cname is None:
1577             expected_cname = cname
1578
1579         till = self.get_KerberosTime(offset=36000)
1580
1581         if to_rodc:
1582             krbtgt_creds = self.get_rodc_krbtgt_creds()
1583         else:
1584             krbtgt_creds = self.get_krbtgt_creds()
1585         ticket_decryption_key = (
1586             self.TicketDecryptionKey_from_creds(krbtgt_creds))
1587
1588         expected_etypes = krbtgt_creds.tgs_supported_enctypes
1589
1590         if kdc_options is None:
1591             kdc_options = ('forwardable,'
1592                            'renewable,'
1593                            'canonicalize,'
1594                            'renewable-ok')
1595         kdc_options = krb5_asn1.KDCOptions(kdc_options)
1596
1597         pac_options = '1'  # supports claims
1598
1599         rep, kdc_exchange_dict = self._test_as_exchange(
1600             cname=cname,
1601             realm=realm,
1602             sname=sname,
1603             till=till,
1604             client_as_etypes=etype,
1605             expected_error_mode=KDC_ERR_PREAUTH_REQUIRED,
1606             expected_crealm=realm,
1607             expected_cname=expected_cname,
1608             expected_srealm=realm,
1609             expected_sname=sname,
1610             expected_account_name=expected_account_name,
1611             expected_upn_name=expected_upn_name,
1612             expected_sid=expected_sid,
1613             expected_salt=salt,
1614             expected_flags=expected_flags,
1615             unexpected_flags=unexpected_flags,
1616             expected_supported_etypes=expected_etypes,
1617             etypes=etype,
1618             padata=None,
1619             kdc_options=kdc_options,
1620             preauth_key=None,
1621             ticket_decryption_key=ticket_decryption_key,
1622             pac_request=pac_request,
1623             pac_options=pac_options,
1624             expect_pac=expect_pac,
1625             expect_pac_attrs=expect_pac_attrs,
1626             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
1627             expect_requester_sid=expect_requester_sid,
1628             rc4_support=rc4_support,
1629             to_rodc=to_rodc)
1630         self.check_pre_authentication(rep)
1631
1632         etype_info2 = kdc_exchange_dict['preauth_etype_info2']
1633
1634         preauth_key = self.PasswordKey_from_etype_info2(creds,
1635                                                         etype_info2[0],
1636                                                         creds.get_kvno())
1637
1638         ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
1639
1640         padata = [ts_enc_padata]
1641
1642         expected_realm = realm.upper()
1643
1644         rep, kdc_exchange_dict = self._test_as_exchange(
1645             cname=cname,
1646             realm=realm,
1647             sname=sname,
1648             till=till,
1649             client_as_etypes=etype,
1650             expected_error_mode=0,
1651             expected_crealm=expected_realm,
1652             expected_cname=expected_cname,
1653             expected_srealm=expected_realm,
1654             expected_sname=expected_sname,
1655             expected_account_name=expected_account_name,
1656             expected_upn_name=expected_upn_name,
1657             expected_sid=expected_sid,
1658             expected_salt=salt,
1659             expected_flags=expected_flags,
1660             unexpected_flags=unexpected_flags,
1661             expected_supported_etypes=expected_etypes,
1662             etypes=etype,
1663             padata=padata,
1664             kdc_options=kdc_options,
1665             preauth_key=preauth_key,
1666             ticket_decryption_key=ticket_decryption_key,
1667             pac_request=pac_request,
1668             pac_options=pac_options,
1669             expect_pac=expect_pac,
1670             expect_pac_attrs=expect_pac_attrs,
1671             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
1672             expect_requester_sid=expect_requester_sid,
1673             rc4_support=rc4_support,
1674             to_rodc=to_rodc)
1675         self.check_as_reply(rep)
1676
1677         ticket_creds = kdc_exchange_dict['rep_ticket_creds']
1678
1679         self.tkt_cache[cache_key] = ticket_creds
1680
1681         return ticket_creds
1682
1683     def _make_tgs_request(self, client_creds, service_creds, tgt,
1684                           client_account=None,
1685                           client_name_type=NT_PRINCIPAL,
1686                           kdc_options=None,
1687                           pac_request=None, expect_pac=True,
1688                           expect_error=False,
1689                           expected_cname=None,
1690                           expected_account_name=None,
1691                           expected_upn_name=None,
1692                           expected_sid=None):
1693         if client_account is None:
1694             client_account = client_creds.get_username()
1695         cname = self.PrincipalName_create(name_type=client_name_type,
1696                                           names=client_account.split('/'))
1697
1698         service_account = service_creds.get_username()
1699         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1700                                           names=[service_account])
1701
1702         realm = service_creds.get_realm()
1703
1704         expected_crealm = realm
1705         if expected_cname is None:
1706             expected_cname = cname
1707         expected_srealm = realm
1708         expected_sname = sname
1709
1710         expected_supported_etypes = service_creds.tgs_supported_enctypes
1711
1712         etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1713
1714         if kdc_options is None:
1715             kdc_options = 'canonicalize'
1716         kdc_options = str(krb5_asn1.KDCOptions(kdc_options))
1717
1718         target_decryption_key = self.TicketDecryptionKey_from_creds(
1719             service_creds)
1720
1721         authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
1722
1723         if expect_error:
1724             expected_error_mode = KDC_ERR_TGT_REVOKED
1725             check_error_fn = self.generic_check_kdc_error
1726             check_rep_fn = None
1727         else:
1728             expected_error_mode = 0
1729             check_error_fn = None
1730             check_rep_fn = self.generic_check_kdc_rep
1731
1732         kdc_exchange_dict = self.tgs_exchange_dict(
1733             expected_crealm=expected_crealm,
1734             expected_cname=expected_cname,
1735             expected_srealm=expected_srealm,
1736             expected_sname=expected_sname,
1737             expected_account_name=expected_account_name,
1738             expected_upn_name=expected_upn_name,
1739             expected_sid=expected_sid,
1740             expected_supported_etypes=expected_supported_etypes,
1741             ticket_decryption_key=target_decryption_key,
1742             check_error_fn=check_error_fn,
1743             check_rep_fn=check_rep_fn,
1744             check_kdc_private_fn=self.generic_check_kdc_private,
1745             expected_error_mode=expected_error_mode,
1746             tgt=tgt,
1747             authenticator_subkey=authenticator_subkey,
1748             kdc_options=kdc_options,
1749             pac_request=pac_request,
1750             expect_pac=expect_pac,
1751             expect_edata=False)
1752
1753         rep = self._generic_kdc_exchange(kdc_exchange_dict,
1754                                          cname=cname,
1755                                          realm=realm,
1756                                          sname=sname,
1757                                          etypes=etypes)
1758         if expect_error:
1759             self.check_error_rep(rep, expected_error_mode)
1760
1761             return None
1762         else:
1763             self.check_reply(rep, KRB_TGS_REP)
1764
1765             return kdc_exchange_dict['rep_ticket_creds']
1766
1767     # Named tuple to contain values of interest when the PAC is decoded.
1768     PacData = namedtuple(
1769         "PacData",
1770         "account_name account_sid logon_name upn domain_name")
1771
1772     def get_pac_data(self, authorization_data):
1773         '''Decode the PAC element contained in the authorization-data element
1774         '''
1775         account_name = None
1776         user_sid = None
1777         logon_name = None
1778         upn = None
1779         domain_name = None
1780
1781         # The PAC data will be wrapped in an AD_IF_RELEVANT element
1782         ad_if_relevant_elements = (
1783             x for x in authorization_data if x['ad-type'] == AD_IF_RELEVANT)
1784         for dt in ad_if_relevant_elements:
1785             buf = self.der_decode(
1786                 dt['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT())
1787             # The PAC data is further wrapped in a AD_WIN2K_PAC element
1788             for ad in (x for x in buf if x['ad-type'] == AD_WIN2K_PAC):
1789                 pb = ndr_unpack(krb5pac.PAC_DATA, ad['ad-data'])
1790                 for pac in pb.buffers:
1791                     if pac.type == krb5pac.PAC_TYPE_LOGON_INFO:
1792                         account_name = (
1793                             pac.info.info.info3.base.account_name)
1794                         user_sid = (
1795                             str(pac.info.info.info3.base.domain_sid)
1796                             + "-" + str(pac.info.info.info3.base.rid))
1797                     elif pac.type == krb5pac.PAC_TYPE_LOGON_NAME:
1798                         logon_name = pac.info.account_name
1799                     elif pac.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
1800                         upn = pac.info.upn_name
1801                         domain_name = pac.info.dns_domain_name
1802
1803         return self.PacData(
1804             account_name,
1805             user_sid,
1806             logon_name,
1807             upn,
1808             domain_name)
1809
1810     def decode_service_ticket(self, creds, ticket):
1811         '''Decrypt and decode a service ticket
1812         '''
1813
1814         name = creds.get_username()
1815         if name.endswith('$'):
1816             name = name[:-1]
1817         realm = creds.get_realm()
1818         salt = "%s.%s@%s" % (name, realm.lower(), realm.upper())
1819
1820         key = self.PasswordKey_create(
1821             ticket['enc-part']['etype'],
1822             creds.get_password(),
1823             salt,
1824             ticket['enc-part']['kvno'])
1825
1826         enc_part = key.decrypt(KU_TICKET, ticket['enc-part']['cipher'])
1827         enc_ticket_part = self.der_decode(
1828             enc_part, asn1Spec=krb5_asn1.EncTicketPart())
1829         return enc_ticket_part
1830
1831     def modify_ticket_flag(self, enc_part, flag, value):
1832         self.assertIsInstance(value, bool)
1833
1834         flag = krb5_asn1.TicketFlags(flag)
1835         pos = len(tuple(flag)) - 1
1836
1837         flags = enc_part['flags']
1838         self.assertLessEqual(pos, len(flags))
1839
1840         new_flags = flags[:pos] + str(int(value)) + flags[pos + 1:]
1841         enc_part['flags'] = new_flags
1842
1843         return enc_part
1844
1845     def get_objectSid(self, samdb, dn):
1846         ''' Get the objectSID for a DN
1847             Note: performs an Ldb query.
1848         '''
1849         res = samdb.search(dn, scope=SCOPE_BASE, attrs=["objectSID"])
1850         self.assertTrue(len(res) == 1, "did not get objectSid for %s" % dn)
1851         sid = samdb.schema_format_value("objectSID", res[0]["objectSID"][0])
1852         return sid.decode('utf8')
1853
1854     def add_attribute(self, samdb, dn_str, name, value):
1855         if isinstance(value, list):
1856             values = value
1857         else:
1858             values = [value]
1859         flag = ldb.FLAG_MOD_ADD
1860
1861         dn = ldb.Dn(samdb, dn_str)
1862         msg = ldb.Message(dn)
1863         msg[name] = ldb.MessageElement(values, flag, name)
1864         samdb.modify(msg)
1865
1866     def modify_attribute(self, samdb, dn_str, name, value):
1867         if isinstance(value, list):
1868             values = value
1869         else:
1870             values = [value]
1871         flag = ldb.FLAG_MOD_REPLACE
1872
1873         dn = ldb.Dn(samdb, dn_str)
1874         msg = ldb.Message(dn)
1875         msg[name] = ldb.MessageElement(values, flag, name)
1876         samdb.modify(msg)
1877
1878     def create_ccache(self, cname, ticket, enc_part):
1879         """ Lay out a version 4 on-disk credentials cache, to be read using the
1880             FILE: protocol.
1881         """
1882
1883         field = krb5ccache.DELTATIME_TAG()
1884         field.kdc_sec_offset = 0
1885         field.kdc_usec_offset = 0
1886
1887         v4tag = krb5ccache.V4TAG()
1888         v4tag.tag = 1
1889         v4tag.field = field
1890
1891         v4tags = krb5ccache.V4TAGS()
1892         v4tags.tag = v4tag
1893         v4tags.further_tags = b''
1894
1895         optional_header = krb5ccache.V4HEADER()
1896         optional_header.v4tags = v4tags
1897
1898         cname_string = cname['name-string']
1899
1900         cprincipal = krb5ccache.PRINCIPAL()
1901         cprincipal.name_type = cname['name-type']
1902         cprincipal.component_count = len(cname_string)
1903         cprincipal.realm = ticket['realm']
1904         cprincipal.components = cname_string
1905
1906         sname = ticket['sname']
1907         sname_string = sname['name-string']
1908
1909         sprincipal = krb5ccache.PRINCIPAL()
1910         sprincipal.name_type = sname['name-type']
1911         sprincipal.component_count = len(sname_string)
1912         sprincipal.realm = ticket['realm']
1913         sprincipal.components = sname_string
1914
1915         key = self.EncryptionKey_import(enc_part['key'])
1916
1917         key_data = key.export_obj()
1918         keyblock = krb5ccache.KEYBLOCK()
1919         keyblock.enctype = key_data['keytype']
1920         keyblock.data = key_data['keyvalue']
1921
1922         addresses = krb5ccache.ADDRESSES()
1923         addresses.count = 0
1924         addresses.data = []
1925
1926         authdata = krb5ccache.AUTHDATA()
1927         authdata.count = 0
1928         authdata.data = []
1929
1930         # Re-encode the ticket, since it was decoded by another layer.
1931         ticket_data = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket())
1932
1933         authtime = enc_part['authtime']
1934         starttime = enc_part.get('starttime', authtime)
1935         endtime = enc_part['endtime']
1936
1937         cred = krb5ccache.CREDENTIAL()
1938         cred.client = cprincipal
1939         cred.server = sprincipal
1940         cred.keyblock = keyblock
1941         cred.authtime = self.get_EpochFromKerberosTime(authtime)
1942         cred.starttime = self.get_EpochFromKerberosTime(starttime)
1943         cred.endtime = self.get_EpochFromKerberosTime(endtime)
1944
1945         # Account for clock skew of up to five minutes.
1946         self.assertLess(cred.authtime - 5 * 60,
1947                         datetime.now(timezone.utc).timestamp(),
1948                         "Ticket not yet valid - clocks may be out of sync.")
1949         self.assertLess(cred.starttime - 5 * 60,
1950                         datetime.now(timezone.utc).timestamp(),
1951                         "Ticket not yet valid - clocks may be out of sync.")
1952         self.assertGreater(cred.endtime - 60 * 60,
1953                            datetime.now(timezone.utc).timestamp(),
1954                            "Ticket already expired/about to expire - "
1955                            "clocks may be out of sync.")
1956
1957         cred.renew_till = cred.endtime
1958         cred.is_skey = 0
1959         cred.ticket_flags = int(enc_part['flags'], 2)
1960         cred.addresses = addresses
1961         cred.authdata = authdata
1962         cred.ticket = ticket_data
1963         cred.second_ticket = b''
1964
1965         ccache = krb5ccache.CCACHE()
1966         ccache.pvno = 5
1967         ccache.version = 4
1968         ccache.optional_header = optional_header
1969         ccache.principal = cprincipal
1970         ccache.cred = cred
1971
1972         # Serialise the credentials cache structure.
1973         result = ndr_pack(ccache)
1974
1975         # Create a temporary file and write the credentials.
1976         cachefile = tempfile.NamedTemporaryFile(dir=self.tempdir, delete=False)
1977         cachefile.write(result)
1978         cachefile.close()
1979
1980         return cachefile
1981
1982     def create_ccache_with_user(self, user_credentials, mach_credentials,
1983                                 service="host", target_name=None, pac=True):
1984         # Obtain a service ticket authorising the user and place it into a
1985         # newly created credentials cache file.
1986
1987         user_name = user_credentials.get_username()
1988         realm = user_credentials.get_realm()
1989
1990         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1991                                           names=[user_name])
1992
1993         tgt = self.get_tgt(user_credentials)
1994
1995         # Request a ticket to the host service on the machine account
1996         ticket = self.get_service_ticket(tgt, mach_credentials,
1997                                          service=service,
1998                                          target_name=target_name)
1999
2000         if not pac:
2001             ticket = self.modified_ticket(ticket, exclude_pac=True)
2002
2003         # Write the ticket into a credentials cache file that can be ingested
2004         # by the main credentials code.
2005         cachefile = self.create_ccache(cname, ticket.ticket,
2006                                        ticket.encpart_private)
2007
2008         # Create a credentials object to reference the credentials cache.
2009         creds = Credentials()
2010         creds.set_kerberos_state(MUST_USE_KERBEROS)
2011         creds.set_username(user_name, SPECIFIED)
2012         creds.set_realm(realm)
2013         creds.set_named_ccache(cachefile.name, SPECIFIED, self.get_lp())
2014
2015         # Return the credentials along with the cache file.
2016         return (creds, cachefile)