2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 from samba import dsdb, ntstatus
28 from samba.dcerpc import krb5pac
30 sys.path.insert(0, "bin/python")
31 os.environ["PYTHONUNBUFFERED"] = "1"
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.rfc4120_constants import (
36 AES256_CTS_HMAC_SHA1_96,
42 KDC_ERR_CLIENT_NAME_MISMATCH,
46 KDC_ERR_S_PRINCIPAL_UNKNOWN,
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
53 global_asn1_print = False
54 global_hexdump = False
57 class KdcTgsTests(KDCBaseTest):
61 self.do_asn1_print = global_asn1_print
62 self.do_hexdump = global_hexdump
64 def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
65 ''' Try and obtain a ticket from the TGS, but supply a cname
66 that differs from that provided to the krbtgt
68 # Create the user account
69 samdb = self.get_samdb()
70 user_name = "tsttktusr"
71 (uc, _) = self.create_account(samdb, user_name)
72 realm = uc.get_realm().lower()
74 # Do the initial AS-REQ, should get a pre-authentication required
76 etype = (AES256_CTS_HMAC_SHA1_96,)
77 cname = self.PrincipalName_create(
78 name_type=NT_PRINCIPAL, names=[user_name])
79 sname = self.PrincipalName_create(
80 name_type=NT_SRV_INST, names=["krbtgt", realm])
82 rep = self.as_req(cname, sname, realm, etype)
83 self.check_pre_authentication(rep)
86 padata = self.get_enc_timestamp_pa_data(uc, rep)
87 key = self.get_as_rep_key(uc, rep)
88 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
89 self.check_as_reply(rep)
91 # Request a service ticket, but use a cname that does not match
92 # that in the original AS-REQ
93 enc_part2 = self.get_as_rep_enc_data(key, rep)
94 key = self.EncryptionKey_import(enc_part2['key'])
95 ticket = rep['ticket']
97 cname = self.PrincipalName_create(
98 name_type=NT_PRINCIPAL,
99 names=["Administrator"])
100 sname = self.PrincipalName_create(
101 name_type=NT_PRINCIPAL,
102 names=["host", samdb.host_dns_name()])
104 (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
105 expected_error_mode=KDC_ERR_BADMATCH,
110 "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
111 self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
117 def test_ldap_service_ticket(self):
118 '''Get a ticket to the ldap service
120 # Create the user account
121 samdb = self.get_samdb()
122 user_name = "tsttktusr"
123 (uc, _) = self.create_account(samdb, user_name)
124 realm = uc.get_realm().lower()
126 # Do the initial AS-REQ, should get a pre-authentication required
128 etype = (AES256_CTS_HMAC_SHA1_96,)
129 cname = self.PrincipalName_create(
130 name_type=NT_PRINCIPAL, names=[user_name])
131 sname = self.PrincipalName_create(
132 name_type=NT_SRV_INST, names=["krbtgt", realm])
134 rep = self.as_req(cname, sname, realm, etype)
135 self.check_pre_authentication(rep)
138 padata = self.get_enc_timestamp_pa_data(uc, rep)
139 key = self.get_as_rep_key(uc, rep)
140 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
141 self.check_as_reply(rep)
143 enc_part2 = self.get_as_rep_enc_data(key, rep)
144 key = self.EncryptionKey_import(enc_part2['key'])
145 ticket = rep['ticket']
147 # Request a ticket to the ldap service
148 sname = self.PrincipalName_create(
149 name_type=NT_SRV_INST,
150 names=["ldap", samdb.host_dns_name()])
152 (rep, _) = self.tgs_req(
153 cname, sname, uc.get_realm(), ticket, key, etype,
154 service_creds=self.get_dc_creds())
156 self.check_tgs_reply(rep)
158 def test_get_ticket_for_host_service_of_machine_account(self):
160 # Create a user and machine account for the test.
162 samdb = self.get_samdb()
163 user_name = "tsttktusr"
164 (uc, dn) = self.create_account(samdb, user_name)
165 (mc, _) = self.create_account(samdb, "tsttktmac",
166 account_type=self.AccountType.COMPUTER)
167 realm = uc.get_realm().lower()
169 # Do the initial AS-REQ, should get a pre-authentication required
171 etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
172 cname = self.PrincipalName_create(
173 name_type=NT_PRINCIPAL, names=[user_name])
174 sname = self.PrincipalName_create(
175 name_type=NT_SRV_INST, names=["krbtgt", realm])
177 rep = self.as_req(cname, sname, realm, etype)
178 self.check_pre_authentication(rep)
181 padata = self.get_enc_timestamp_pa_data(uc, rep)
182 key = self.get_as_rep_key(uc, rep)
183 rep = self.as_req(cname, sname, realm, etype, padata=[padata])
184 self.check_as_reply(rep)
186 # Request a ticket to the host service on the machine account
187 ticket = rep['ticket']
188 enc_part2 = self.get_as_rep_enc_data(key, rep)
189 key = self.EncryptionKey_import(enc_part2['key'])
190 cname = self.PrincipalName_create(
191 name_type=NT_PRINCIPAL,
193 sname = self.PrincipalName_create(
194 name_type=NT_PRINCIPAL,
195 names=[mc.get_username()])
197 (rep, enc_part) = self.tgs_req(
198 cname, sname, uc.get_realm(), ticket, key, etype,
200 self.check_tgs_reply(rep)
202 # Check the contents of the service ticket
203 ticket = rep['ticket']
204 enc_part = self.decode_service_ticket(mc, ticket)
206 pac_data = self.get_pac_data(enc_part['authorization-data'])
207 sid = self.get_objectSid(samdb, dn)
208 upn = "%s@%s" % (uc.get_username(), realm)
211 str(pac_data.account_name),
212 "rep = {%s},%s" % (rep, pac_data))
216 "rep = {%s},%s" % (rep, pac_data))
219 pac_data.domain_name,
220 "rep = {%s},%s" % (rep, pac_data))
224 "rep = {%s},%s" % (rep, pac_data))
227 pac_data.account_sid,
228 "rep = {%s},%s" % (rep, pac_data))
230 def _make_tgs_request(self, client_creds, service_creds, tgt,
231 pac_request=None, expect_pac=True,
233 expected_account_name=None,
234 expected_upn_name=None,
236 client_account = client_creds.get_username()
237 cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
238 names=[client_account])
240 service_account = service_creds.get_username()
241 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
242 names=[service_account])
244 realm = service_creds.get_realm()
246 expected_crealm = realm
247 expected_cname = cname
248 expected_srealm = realm
249 expected_sname = sname
251 expected_supported_etypes = service_creds.tgs_supported_enctypes
253 etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
255 kdc_options = str(krb5_asn1.KDCOptions('canonicalize'))
257 target_decryption_key = self.TicketDecryptionKey_from_creds(
260 authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
263 expected_error_mode = KDC_ERR_BADOPTION
264 check_error_fn = self.generic_check_kdc_error
267 expected_error_mode = 0
268 check_error_fn = None
269 check_rep_fn = self.generic_check_kdc_rep
271 kdc_exchange_dict = self.tgs_exchange_dict(
272 expected_crealm=expected_crealm,
273 expected_cname=expected_cname,
274 expected_srealm=expected_srealm,
275 expected_sname=expected_sname,
276 expected_account_name=expected_account_name,
277 expected_upn_name=expected_upn_name,
278 expected_sid=expected_sid,
279 expected_supported_etypes=expected_supported_etypes,
280 ticket_decryption_key=target_decryption_key,
281 check_error_fn=check_error_fn,
282 check_rep_fn=check_rep_fn,
283 check_kdc_private_fn=self.generic_check_kdc_private,
284 expected_error_mode=expected_error_mode,
286 authenticator_subkey=authenticator_subkey,
287 kdc_options=kdc_options,
288 pac_request=pac_request,
289 expect_pac=expect_pac)
291 rep = self._generic_kdc_exchange(kdc_exchange_dict,
297 self.check_error_rep(rep, expected_error_mode)
301 self.check_reply(rep, KRB_TGS_REP)
303 return kdc_exchange_dict['rep_ticket_creds']
305 def test_request(self):
306 client_creds = self.get_client_creds()
307 service_creds = self.get_service_creds()
309 tgt = self.get_tgt(client_creds)
311 pac = self.get_ticket_pac(tgt)
312 self.assertIsNotNone(pac)
314 ticket = self._make_tgs_request(client_creds, service_creds, tgt)
316 pac = self.get_ticket_pac(ticket)
317 self.assertIsNotNone(pac)
319 def test_request_no_pac(self):
320 client_creds = self.get_client_creds()
321 service_creds = self.get_service_creds()
323 tgt = self.get_tgt(client_creds, pac_request=False)
325 pac = self.get_ticket_pac(tgt)
326 self.assertIsNotNone(pac)
328 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
329 pac_request=False, expect_pac=False)
331 pac = self.get_ticket_pac(ticket, expect_pac=False)
332 self.assertIsNone(pac)
334 def test_client_no_auth_data_required(self):
335 client_creds = self.get_cached_creds(
336 account_type=self.AccountType.USER,
337 opts={'no_auth_data_required': True})
338 service_creds = self.get_service_creds()
340 tgt = self.get_tgt(client_creds)
342 pac = self.get_ticket_pac(tgt)
343 self.assertIsNotNone(pac)
345 ticket = self._make_tgs_request(client_creds, service_creds, tgt)
347 pac = self.get_ticket_pac(ticket)
348 self.assertIsNotNone(pac)
350 def test_no_pac_client_no_auth_data_required(self):
351 client_creds = self.get_cached_creds(
352 account_type=self.AccountType.USER,
353 opts={'no_auth_data_required': True})
354 service_creds = self.get_service_creds()
356 tgt = self.get_tgt(client_creds)
358 pac = self.get_ticket_pac(tgt)
359 self.assertIsNotNone(pac)
361 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
362 pac_request=False, expect_pac=True)
364 pac = self.get_ticket_pac(ticket)
365 self.assertIsNotNone(pac)
367 def test_service_no_auth_data_required(self):
368 client_creds = self.get_client_creds()
369 service_creds = self.get_cached_creds(
370 account_type=self.AccountType.COMPUTER,
371 opts={'no_auth_data_required': True})
373 tgt = self.get_tgt(client_creds)
375 pac = self.get_ticket_pac(tgt)
376 self.assertIsNotNone(pac)
378 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
381 pac = self.get_ticket_pac(ticket, expect_pac=False)
382 self.assertIsNone(pac)
384 def test_no_pac_service_no_auth_data_required(self):
385 client_creds = self.get_client_creds()
386 service_creds = self.get_cached_creds(
387 account_type=self.AccountType.COMPUTER,
388 opts={'no_auth_data_required': True})
390 tgt = self.get_tgt(client_creds, pac_request=False)
392 pac = self.get_ticket_pac(tgt)
393 self.assertIsNotNone(pac)
395 ticket = self._make_tgs_request(client_creds, service_creds, tgt,
396 pac_request=False, expect_pac=False)
398 pac = self.get_ticket_pac(ticket, expect_pac=False)
399 self.assertIsNone(pac)
401 def test_remove_pac_service_no_auth_data_required(self):
402 client_creds = self.get_client_creds()
403 service_creds = self.get_cached_creds(
404 account_type=self.AccountType.COMPUTER,
405 opts={'no_auth_data_required': True})
407 tgt = self.modified_ticket(self.get_tgt(client_creds),
410 pac = self.get_ticket_pac(tgt, expect_pac=False)
411 self.assertIsNone(pac)
413 self._make_tgs_request(client_creds, service_creds, tgt,
414 expect_pac=False, expect_error=True)
416 def test_remove_pac_client_no_auth_data_required(self):
417 client_creds = self.get_cached_creds(
418 account_type=self.AccountType.USER,
419 opts={'no_auth_data_required': True})
420 service_creds = self.get_service_creds()
422 tgt = self.modified_ticket(self.get_tgt(client_creds),
425 pac = self.get_ticket_pac(tgt, expect_pac=False)
426 self.assertIsNone(pac)
428 self._make_tgs_request(client_creds, service_creds, tgt,
429 expect_pac=False, expect_error=True)
431 def test_remove_pac(self):
432 client_creds = self.get_client_creds()
433 service_creds = self.get_service_creds()
435 tgt = self.modified_ticket(self.get_tgt(client_creds),
438 pac = self.get_ticket_pac(tgt, expect_pac=False)
439 self.assertIsNone(pac)
441 self._make_tgs_request(client_creds, service_creds, tgt,
442 expect_pac=False, expect_error=True)
444 def test_upn_dns_info_ex_user(self):
445 client_creds = self.get_client_creds()
446 self._run_upn_dns_info_ex_test(client_creds)
448 def test_upn_dns_info_ex_mac(self):
449 mach_creds = self.get_mach_creds()
450 self._run_upn_dns_info_ex_test(mach_creds)
452 def test_upn_dns_info_ex_upn_user(self):
453 client_creds = self.get_cached_creds(
454 account_type=self.AccountType.USER,
455 opts={'upn': 'upn_dns_info_test_upn0@bar'})
456 self._run_upn_dns_info_ex_test(client_creds)
458 def test_upn_dns_info_ex_upn_mac(self):
459 mach_creds = self.get_cached_creds(
460 account_type=self.AccountType.COMPUTER,
461 opts={'upn': 'upn_dns_info_test_upn1@bar'})
462 self._run_upn_dns_info_ex_test(mach_creds)
464 def _run_upn_dns_info_ex_test(self, client_creds):
465 service_creds = self.get_service_creds()
467 samdb = self.get_samdb()
468 dn = client_creds.get_dn()
470 account_name = client_creds.get_username()
471 upn_name = client_creds.get_upn()
473 realm = client_creds.get_realm().lower()
474 upn_name = f'{account_name}@{realm}'
475 sid = self.get_objectSid(samdb, dn)
477 tgt = self.get_tgt(client_creds,
478 expected_account_name=account_name,
479 expected_upn_name=upn_name,
482 self._make_tgs_request(client_creds, service_creds, tgt,
483 expected_account_name=account_name,
484 expected_upn_name=upn_name,
487 # Test making a TGS request.
488 def test_tgs_req(self):
489 creds = self._get_creds()
490 tgt = self._get_tgt(creds)
491 self._run_tgs(tgt, expected_error=0)
493 def test_renew_req(self):
494 creds = self._get_creds()
495 tgt = self._get_tgt(creds, renewable=True)
496 self._renew_tgt(tgt, expected_error=0)
498 def test_validate_req(self):
499 creds = self._get_creds()
500 tgt = self._get_tgt(creds, invalid=True)
501 self._validate_tgt(tgt, expected_error=0)
503 def test_s4u2self_req(self):
504 creds = self._get_creds()
505 tgt = self._get_tgt(creds)
506 self._s4u2self(tgt, creds, expected_error=0)
508 def test_user2user_req(self):
509 creds = self._get_creds()
510 tgt = self._get_tgt(creds)
511 self._user2user(tgt, creds, expected_error=0)
513 # Test making a request without a PAC.
514 def test_tgs_no_pac(self):
515 creds = self._get_creds()
516 tgt = self._get_tgt(creds, remove_pac=True)
517 self._run_tgs(tgt, expected_error=KDC_ERR_BADOPTION)
519 def test_renew_no_pac(self):
520 creds = self._get_creds()
521 tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
522 self._renew_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
524 def test_validate_no_pac(self):
525 creds = self._get_creds()
526 tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
527 self._validate_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
529 def test_s4u2self_no_pac(self):
530 creds = self._get_creds()
531 tgt = self._get_tgt(creds, remove_pac=True)
532 self._s4u2self(tgt, creds,
533 expected_error=(KDC_ERR_GENERIC, KDC_ERR_BADOPTION),
534 expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER,
537 def test_user2user_no_pac(self):
538 creds = self._get_creds()
539 tgt = self._get_tgt(creds, remove_pac=True)
540 self._user2user(tgt, creds, expected_error=KDC_ERR_BADOPTION)
542 # Test making a request with authdata and without a PAC.
543 def test_tgs_authdata_no_pac(self):
544 creds = self._get_creds()
545 tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
546 self._run_tgs(tgt, expected_error=KDC_ERR_BADOPTION)
548 def test_renew_authdata_no_pac(self):
549 creds = self._get_creds()
550 tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
551 allow_empty_authdata=True)
552 self._renew_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
554 def test_validate_authdata_no_pac(self):
555 creds = self._get_creds()
556 tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
557 allow_empty_authdata=True)
558 self._validate_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
560 def test_s4u2self_authdata_no_pac(self):
561 creds = self._get_creds()
562 tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
563 self._s4u2self(tgt, creds,
564 expected_error=(KDC_ERR_GENERIC, KDC_ERR_BADOPTION),
565 expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER,
568 def test_user2user_authdata_no_pac(self):
569 creds = self._get_creds()
570 tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
571 self._user2user(tgt, creds, expected_error=KDC_ERR_BADOPTION)
573 # Test changing the SID in the PAC to that of another account.
574 def test_tgs_sid_mismatch_existing(self):
575 creds = self._get_creds()
576 existing_rid = self._get_existing_rid()
577 tgt = self._get_tgt(creds, new_rid=existing_rid)
578 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
580 def test_renew_sid_mismatch_existing(self):
581 creds = self._get_creds()
582 existing_rid = self._get_existing_rid()
583 tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
584 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
586 def test_validate_sid_mismatch_existing(self):
587 creds = self._get_creds()
588 existing_rid = self._get_existing_rid()
589 tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
590 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
592 def test_s4u2self_sid_mismatch_existing(self):
593 creds = self._get_creds()
594 existing_rid = self._get_existing_rid()
595 tgt = self._get_tgt(creds, new_rid=existing_rid)
596 self._s4u2self(tgt, creds,
597 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
599 def test_user2user_sid_mismatch_existing(self):
600 creds = self._get_creds()
601 existing_rid = self._get_existing_rid()
602 tgt = self._get_tgt(creds, new_rid=existing_rid)
603 self._user2user(tgt, creds,
604 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
606 # Test changing the SID in the PAC to a non-existent one.
607 def test_tgs_sid_mismatch_nonexisting(self):
608 creds = self._get_creds()
609 nonexistent_rid = self._get_non_existent_rid()
610 tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
611 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
613 def test_renew_sid_mismatch_nonexisting(self):
614 creds = self._get_creds()
615 nonexistent_rid = self._get_non_existent_rid()
616 tgt = self._get_tgt(creds, renewable=True,
617 new_rid=nonexistent_rid)
618 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
620 def test_validate_sid_mismatch_nonexisting(self):
621 creds = self._get_creds()
622 nonexistent_rid = self._get_non_existent_rid()
623 tgt = self._get_tgt(creds, invalid=True,
624 new_rid=nonexistent_rid)
625 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
627 def test_s4u2self_sid_mismatch_nonexisting(self):
628 creds = self._get_creds()
629 nonexistent_rid = self._get_non_existent_rid()
630 tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
631 self._s4u2self(tgt, creds,
632 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
634 def test_user2user_sid_mismatch_nonexisting(self):
635 creds = self._get_creds()
636 nonexistent_rid = self._get_non_existent_rid()
637 tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
638 self._user2user(tgt, creds,
639 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
641 # Test with an RODC-issued ticket where the client is revealed to the RODC.
642 def test_tgs_rodc_revealed(self):
643 creds = self._get_creds(replication_allowed=True,
644 revealed_to_rodc=True)
645 tgt = self._get_tgt(creds, from_rodc=True)
646 self._run_tgs(tgt, expected_error=0)
648 def test_renew_rodc_revealed(self):
649 creds = self._get_creds(replication_allowed=True,
650 revealed_to_rodc=True)
651 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
652 self._renew_tgt(tgt, expected_error=0)
654 def test_validate_rodc_revealed(self):
655 creds = self._get_creds(replication_allowed=True,
656 revealed_to_rodc=True)
657 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
658 self._validate_tgt(tgt, expected_error=0)
660 def test_s4u2self_rodc_revealed(self):
661 creds = self._get_creds(replication_allowed=True,
662 revealed_to_rodc=True)
663 tgt = self._get_tgt(creds, from_rodc=True)
664 self._s4u2self(tgt, creds, expected_error=0)
666 def test_user2user_rodc_revealed(self):
667 creds = self._get_creds(replication_allowed=True,
668 revealed_to_rodc=True)
669 tgt = self._get_tgt(creds, from_rodc=True)
670 self._user2user(tgt, creds, expected_error=0)
672 # Test with an RODC-issued ticket where the SID in the PAC is changed to
673 # that of another account.
674 def test_tgs_rodc_sid_mismatch_existing(self):
675 creds = self._get_creds(replication_allowed=True,
676 revealed_to_rodc=True)
677 existing_rid = self._get_existing_rid(replication_allowed=True,
678 revealed_to_rodc=True)
679 tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
680 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
682 def test_renew_rodc_sid_mismatch_existing(self):
683 creds = self._get_creds(replication_allowed=True,
684 revealed_to_rodc=True)
685 existing_rid = self._get_existing_rid(replication_allowed=True,
686 revealed_to_rodc=True)
687 tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
688 new_rid=existing_rid)
689 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
691 def test_validate_rodc_sid_mismatch_existing(self):
692 creds = self._get_creds(replication_allowed=True,
693 revealed_to_rodc=True)
694 existing_rid = self._get_existing_rid(replication_allowed=True,
695 revealed_to_rodc=True)
696 tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
697 new_rid=existing_rid)
698 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
700 def test_s4u2self_rodc_sid_mismatch_existing(self):
701 creds = self._get_creds(replication_allowed=True,
702 revealed_to_rodc=True)
703 existing_rid = self._get_existing_rid(replication_allowed=True,
704 revealed_to_rodc=True)
705 tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
706 self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
708 def test_user2user_rodc_sid_mismatch_existing(self):
709 creds = self._get_creds(replication_allowed=True,
710 revealed_to_rodc=True)
711 existing_rid = self._get_existing_rid(replication_allowed=True,
712 revealed_to_rodc=True)
713 tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
714 self._user2user(tgt, creds,
715 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
717 # Test with an RODC-issued ticket where the SID in the PAC is changed to a
719 def test_tgs_rodc_sid_mismatch_nonexisting(self):
720 creds = self._get_creds(replication_allowed=True,
721 revealed_to_rodc=True)
722 nonexistent_rid = self._get_non_existent_rid()
723 tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
724 self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
726 def test_renew_rodc_sid_mismatch_nonexisting(self):
727 creds = self._get_creds(replication_allowed=True,
728 revealed_to_rodc=True)
729 nonexistent_rid = self._get_non_existent_rid()
730 tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
731 new_rid=nonexistent_rid)
732 self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
734 def test_validate_rodc_sid_mismatch_nonexisting(self):
735 creds = self._get_creds(replication_allowed=True,
736 revealed_to_rodc=True)
737 nonexistent_rid = self._get_non_existent_rid()
738 tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
739 new_rid=nonexistent_rid)
740 self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
742 def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
743 creds = self._get_creds(replication_allowed=True,
744 revealed_to_rodc=True)
745 nonexistent_rid = self._get_non_existent_rid()
746 tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
747 self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
749 def test_user2user_rodc_sid_mismatch_nonexisting(self):
750 creds = self._get_creds(replication_allowed=True,
751 revealed_to_rodc=True)
752 nonexistent_rid = self._get_non_existent_rid()
753 tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
754 self._user2user(tgt, creds,
755 expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
757 # Test with an RODC-issued ticket where the client is not revealed to the
759 def test_tgs_rodc_not_revealed(self):
760 creds = self._get_creds(replication_allowed=True)
761 tgt = self._get_tgt(creds, from_rodc=True)
763 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
765 def test_renew_rodc_not_revealed(self):
766 creds = self._get_creds(replication_allowed=True)
767 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
768 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
770 def test_validate_rodc_not_revealed(self):
771 creds = self._get_creds(replication_allowed=True)
772 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
773 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
775 def test_s4u2self_rodc_not_revealed(self):
776 creds = self._get_creds(replication_allowed=True)
777 tgt = self._get_tgt(creds, from_rodc=True)
778 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
780 def test_user2user_rodc_not_revealed(self):
781 creds = self._get_creds(replication_allowed=True)
782 tgt = self._get_tgt(creds, from_rodc=True)
783 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
785 # Test with an RODC-issued ticket where the RODC account does not have the
786 # PARTIAL_SECRETS bit set.
787 def test_tgs_rodc_no_partial_secrets(self):
788 creds = self._get_creds(replication_allowed=True,
789 revealed_to_rodc=True)
790 tgt = self._get_tgt(creds, from_rodc=True)
791 self._remove_rodc_partial_secrets()
792 self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
794 def test_renew_rodc_no_partial_secrets(self):
795 creds = self._get_creds(replication_allowed=True,
796 revealed_to_rodc=True)
797 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
798 self._remove_rodc_partial_secrets()
799 self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
801 def test_validate_rodc_no_partial_secrets(self):
802 creds = self._get_creds(replication_allowed=True,
803 revealed_to_rodc=True)
804 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
805 self._remove_rodc_partial_secrets()
806 self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
808 def test_s4u2self_rodc_no_partial_secrets(self):
809 creds = self._get_creds(replication_allowed=True,
810 revealed_to_rodc=True)
811 tgt = self._get_tgt(creds, from_rodc=True)
812 self._remove_rodc_partial_secrets()
813 self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
815 def test_user2user_rodc_no_partial_secrets(self):
816 creds = self._get_creds(replication_allowed=True,
817 revealed_to_rodc=True)
818 tgt = self._get_tgt(creds, from_rodc=True)
819 self._remove_rodc_partial_secrets()
820 self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
822 # Test with an RODC-issued ticket where the RODC account does not have an
824 def test_tgs_rodc_no_krbtgt_link(self):
825 creds = self._get_creds(replication_allowed=True,
826 revealed_to_rodc=True)
827 tgt = self._get_tgt(creds, from_rodc=True)
828 self._remove_rodc_krbtgt_link()
829 self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
831 def test_renew_rodc_no_krbtgt_link(self):
832 creds = self._get_creds(replication_allowed=True,
833 revealed_to_rodc=True)
834 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
835 self._remove_rodc_krbtgt_link()
836 self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
838 def test_validate_rodc_no_krbtgt_link(self):
839 creds = self._get_creds(replication_allowed=True,
840 revealed_to_rodc=True)
841 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
842 self._remove_rodc_krbtgt_link()
843 self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
845 def test_s4u2self_rodc_no_krbtgt_link(self):
846 creds = self._get_creds(replication_allowed=True,
847 revealed_to_rodc=True)
848 tgt = self._get_tgt(creds, from_rodc=True)
849 self._remove_rodc_krbtgt_link()
850 self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
852 def test_user2user_rodc_no_krbtgt_link(self):
853 creds = self._get_creds(replication_allowed=True,
854 revealed_to_rodc=True)
855 tgt = self._get_tgt(creds, from_rodc=True)
856 self._remove_rodc_krbtgt_link()
857 self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
859 # Test with an RODC-issued ticket where the client is not allowed to
860 # replicate to the RODC.
861 def test_tgs_rodc_not_allowed(self):
862 creds = self._get_creds(revealed_to_rodc=True)
863 tgt = self._get_tgt(creds, from_rodc=True)
864 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
866 def test_renew_rodc_not_allowed(self):
867 creds = self._get_creds(revealed_to_rodc=True)
868 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
869 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
871 def test_validate_rodc_not_allowed(self):
872 creds = self._get_creds(revealed_to_rodc=True)
873 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
874 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
876 def test_s4u2self_rodc_not_allowed(self):
877 creds = self._get_creds(revealed_to_rodc=True)
878 tgt = self._get_tgt(creds, from_rodc=True)
879 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
881 def test_user2user_rodc_not_allowed(self):
882 creds = self._get_creds(revealed_to_rodc=True)
883 tgt = self._get_tgt(creds, from_rodc=True)
884 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
886 # Test with an RODC-issued ticket where the client is denied from
887 # replicating to the RODC.
888 def test_tgs_rodc_denied(self):
889 creds = self._get_creds(replication_denied=True,
890 revealed_to_rodc=True)
891 tgt = self._get_tgt(creds, from_rodc=True)
892 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
894 def test_renew_rodc_denied(self):
895 creds = self._get_creds(replication_denied=True,
896 revealed_to_rodc=True)
897 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
898 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
900 def test_validate_rodc_denied(self):
901 creds = self._get_creds(replication_denied=True,
902 revealed_to_rodc=True)
903 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
904 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
906 def test_s4u2self_rodc_denied(self):
907 creds = self._get_creds(replication_denied=True,
908 revealed_to_rodc=True)
909 tgt = self._get_tgt(creds, from_rodc=True)
910 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
912 def test_user2user_rodc_denied(self):
913 creds = self._get_creds(replication_denied=True,
914 revealed_to_rodc=True)
915 tgt = self._get_tgt(creds, from_rodc=True)
916 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
918 # Test with an RODC-issued ticket where the client is both allowed and
919 # denied replicating to the RODC.
920 def test_tgs_rodc_allowed_denied(self):
921 creds = self._get_creds(replication_allowed=True,
922 replication_denied=True,
923 revealed_to_rodc=True)
924 tgt = self._get_tgt(creds, from_rodc=True)
925 self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
927 def test_renew_rodc_allowed_denied(self):
928 creds = self._get_creds(replication_allowed=True,
929 replication_denied=True,
930 revealed_to_rodc=True)
931 tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
932 self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
934 def test_validate_rodc_allowed_denied(self):
935 creds = self._get_creds(replication_allowed=True,
936 replication_denied=True,
937 revealed_to_rodc=True)
938 tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
939 self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
941 def test_s4u2self_rodc_allowed_denied(self):
942 creds = self._get_creds(replication_allowed=True,
943 replication_denied=True,
944 revealed_to_rodc=True)
945 tgt = self._get_tgt(creds, from_rodc=True)
946 self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
948 def test_user2user_rodc_allowed_denied(self):
949 creds = self._get_creds(replication_allowed=True,
950 replication_denied=True,
951 revealed_to_rodc=True)
952 tgt = self._get_tgt(creds, from_rodc=True)
953 self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
955 # Test user-to-user with incorrect service principal names.
956 def test_user2user_matching_sname_host(self):
957 creds = self._get_creds()
958 tgt = self._get_tgt(creds)
960 user_name = creds.get_username()
961 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
962 names=['host', user_name])
964 self._user2user(tgt, creds, sname=sname,
965 expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
967 def test_user2user_matching_sname_no_host(self):
968 creds = self._get_creds()
969 tgt = self._get_tgt(creds)
971 user_name = creds.get_username()
972 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
975 self._user2user(tgt, creds, sname=sname, expected_error=0)
977 def test_user2user_wrong_sname(self):
978 creds = self._get_creds()
979 tgt = self._get_tgt(creds)
981 other_creds = self._get_mach_creds()
982 user_name = other_creds.get_username()
983 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
986 self._user2user(tgt, creds, sname=sname,
987 expected_error=(KDC_ERR_BADMATCH,
990 def test_user2user_non_existent_sname(self):
991 creds = self._get_creds()
992 tgt = self._get_tgt(creds)
994 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
995 names=['host', 'non_existent_user'])
997 self._user2user(tgt, creds, sname=sname,
998 expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
1000 def test_user2user_service_ticket(self):
1001 creds = self._get_creds()
1002 tgt = self._get_tgt(creds)
1004 service_creds = self.get_service_creds()
1005 service_ticket = self.get_service_ticket(tgt, service_creds)
1007 self._user2user(service_ticket, creds,
1008 expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1017 allow_empty_authdata=False):
1018 self.assertFalse(renewable and invalid)
1021 self.assertIsNone(new_rid)
1023 tgt = self.get_tgt(client_creds)
1026 krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1028 krbtgt_creds = self.get_krbtgt_creds()
1030 if new_rid is not None:
1031 def change_sid_fn(pac):
1032 for pac_buffer in pac.buffers:
1033 if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
1034 logon_info = pac_buffer.info.info
1036 logon_info.info3.base.rid = new_rid
1040 modify_pac_fn = change_sid_fn
1042 modify_pac_fn = None
1044 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1047 checksum_keys = None
1050 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
1054 def set_renewable(enc_part):
1055 # Set the renewable flag.
1056 renewable_flag = krb5_asn1.TicketFlags('renewable')
1057 pos = len(tuple(renewable_flag)) - 1
1059 flags = enc_part['flags']
1060 self.assertLessEqual(pos, len(flags))
1062 new_flags = flags[:pos] + '1' + flags[pos + 1:]
1063 enc_part['flags'] = new_flags
1065 # Set the renew-till time to be in the future.
1066 renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
1067 enc_part['renew-till'] = renew_till
1071 modify_fn = set_renewable
1073 def set_invalid(enc_part):
1074 # Set the invalid flag.
1075 invalid_flag = krb5_asn1.TicketFlags('invalid')
1076 pos = len(tuple(invalid_flag)) - 1
1078 flags = enc_part['flags']
1079 self.assertLessEqual(pos, len(flags))
1081 new_flags = flags[:pos] + '1' + flags[pos + 1:]
1082 enc_part['flags'] = new_flags
1084 # Set the ticket start time to be in the past.
1085 past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
1086 enc_part['starttime'] = past_time
1090 modify_fn = set_invalid
1094 return self.modified_ticket(
1096 new_ticket_key=krbtgt_key,
1097 modify_fn=modify_fn,
1098 modify_pac_fn=modify_pac_fn,
1099 exclude_pac=remove_pac,
1100 allow_empty_authdata=allow_empty_authdata,
1101 update_pac_checksums=not remove_pac,
1102 checksum_keys=checksum_keys)
1104 def _remove_rodc_partial_secrets(self):
1105 samdb = self.get_samdb()
1107 rodc_ctx = self.get_mock_rodc_ctx()
1108 rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
1110 def add_rodc_partial_secrets():
1113 msg['userAccountControl'] = ldb.MessageElement(
1114 str(rodc_ctx.userAccountControl),
1115 ldb.FLAG_MOD_REPLACE,
1116 'userAccountControl')
1119 self.addCleanup(add_rodc_partial_secrets)
1121 uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
1125 msg['userAccountControl'] = ldb.MessageElement(
1127 ldb.FLAG_MOD_REPLACE,
1128 'userAccountControl')
1131 def _remove_rodc_krbtgt_link(self):
1132 samdb = self.get_samdb()
1134 rodc_ctx = self.get_mock_rodc_ctx()
1135 rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
1137 def add_rodc_krbtgt_link():
1140 msg['msDS-KrbTgtLink'] = ldb.MessageElement(
1141 rodc_ctx.new_krbtgt_dn,
1146 self.addCleanup(add_rodc_krbtgt_link)
1150 msg['msDS-KrbTgtLink'] = ldb.MessageElement(
1152 ldb.FLAG_MOD_DELETE,
1156 def _get_creds(self,
1157 replication_allowed=False,
1158 replication_denied=False,
1159 revealed_to_rodc=False):
1160 return self.get_cached_creds(
1161 account_type=self.AccountType.COMPUTER,
1163 'allowed_replication_mock': replication_allowed,
1164 'denied_replication_mock': replication_denied,
1165 'revealed_to_mock_rodc': revealed_to_rodc,
1169 def _get_existing_rid(self,
1170 replication_allowed=False,
1171 replication_denied=False,
1172 revealed_to_rodc=False):
1173 other_creds = self.get_cached_creds(
1174 account_type=self.AccountType.COMPUTER,
1176 'allowed_replication_mock': replication_allowed,
1177 'denied_replication_mock': replication_denied,
1178 'revealed_to_mock_rodc': revealed_to_rodc,
1182 samdb = self.get_samdb()
1184 other_dn = other_creds.get_dn()
1185 other_sid = self.get_objectSid(samdb, other_dn)
1187 other_rid = int(other_sid.rsplit('-', 1)[1])
1191 def _get_mach_creds(self):
1192 return self.get_cached_creds(
1193 account_type=self.AccountType.COMPUTER,
1195 'allowed_replication_mock': True,
1196 'denied_replication_mock': False,
1197 'revealed_to_mock_rodc': True,
1201 def _get_non_existent_rid(self):
1202 return (1 << 30) - 1
1204 def _run_tgs(self, tgt, expected_error):
1205 target_creds = self.get_service_creds()
1206 self._tgs_req(tgt, expected_error, target_creds)
1208 def _renew_tgt(self, tgt, expected_error):
1209 krbtgt_creds = self.get_krbtgt_creds()
1210 kdc_options = str(krb5_asn1.KDCOptions('renew'))
1211 self._tgs_req(tgt, expected_error, krbtgt_creds,
1212 kdc_options=kdc_options)
1214 def _validate_tgt(self, tgt, expected_error):
1215 krbtgt_creds = self.get_krbtgt_creds()
1216 kdc_options = str(krb5_asn1.KDCOptions('validate'))
1217 self._tgs_req(tgt, expected_error, krbtgt_creds,
1218 kdc_options=kdc_options)
1220 def _s4u2self(self, tgt, tgt_creds, expected_error,
1221 expect_edata=False, expected_status=None):
1222 user_creds = self._get_mach_creds()
1224 user_name = user_creds.get_username()
1225 user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1227 user_realm = user_creds.get_realm()
1229 def generate_s4u2self_padata(_kdc_exchange_dict,
1232 padata = self.PA_S4U2Self_create(
1235 tgt_session_key=tgt.session_key,
1238 return [padata], req_body
1240 return self._tgs_req(tgt, expected_error, tgt_creds,
1241 expected_cname=user_cname,
1242 generate_padata_fn=generate_s4u2self_padata,
1243 expect_claims=False, expect_edata=expect_edata,
1244 expected_status=expected_status)
1246 def _user2user(self, tgt, tgt_creds, expected_error, sname=None):
1247 user_creds = self._get_mach_creds()
1248 user_tgt = self.get_tgt(user_creds)
1250 kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
1251 self._tgs_req(user_tgt, expected_error, tgt_creds,
1252 kdc_options=kdc_options,
1253 additional_ticket=tgt,
1256 def _tgs_req(self, tgt, expected_error, target_creds,
1258 expected_cname=None,
1259 additional_ticket=None,
1260 generate_padata_fn=None,
1264 expected_status=None):
1265 srealm = target_creds.get_realm()
1268 target_name = target_creds.get_username()
1269 if target_name == 'krbtgt':
1270 sname = self.PrincipalName_create(name_type=NT_SRV_INST,
1271 names=[target_name, srealm])
1273 if target_name[-1] == '$':
1274 target_name = target_name[:-1]
1275 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1276 names=['host', target_name])
1278 if additional_ticket is not None:
1279 additional_tickets = [additional_ticket.ticket]
1280 decryption_key = additional_ticket.session_key
1282 additional_tickets = None
1283 decryption_key = self.TicketDecryptionKey_from_creds(
1286 subkey = self.RandomKey(tgt.session_key.etype)
1288 etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1291 check_error_fn = self.generic_check_kdc_error
1294 check_error_fn = None
1295 check_rep_fn = self.generic_check_kdc_rep
1297 if expected_cname is None:
1298 expected_cname = tgt.cname
1300 kdc_exchange_dict = self.tgs_exchange_dict(
1301 expected_crealm=tgt.crealm,
1302 expected_cname=expected_cname,
1303 expected_srealm=srealm,
1304 expected_sname=sname,
1305 ticket_decryption_key=decryption_key,
1306 generate_padata_fn=generate_padata_fn,
1307 check_error_fn=check_error_fn,
1308 check_rep_fn=check_rep_fn,
1309 check_kdc_private_fn=self.generic_check_kdc_private,
1310 expected_error_mode=expected_error,
1311 expected_status=expected_status,
1313 authenticator_subkey=subkey,
1314 kdc_options=kdc_options,
1315 expect_edata=expect_edata,
1316 expect_claims=expect_claims)
1318 rep = self._generic_kdc_exchange(kdc_exchange_dict,
1323 additional_tickets=additional_tickets)
1325 self.check_error_rep(rep, expected_error)
1328 self.check_reply(rep, KRB_TGS_REP)
1329 return kdc_exchange_dict['rep_ticket_creds']
1332 if __name__ == "__main__":
1333 global_asn1_print = False
1334 global_hexdump = False