CVE-2020-25719 tests/krb5: tests/krb5: Adjust expected error code for S4U2Self no...
[samba.git] / python / samba / tests / krb5 / kdc_tgs_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) 2020 Catalyst.Net Ltd
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 import ldb
24
25
26 from samba import dsdb, ntstatus
27
28 from samba.dcerpc import krb5pac
29
30 sys.path.insert(0, "bin/python")
31 os.environ["PYTHONUNBUFFERED"] = "1"
32
33 import samba.tests.krb5.kcrypto as kcrypto
34 from samba.tests.krb5.kdc_base_test import KDCBaseTest
35 from samba.tests.krb5.rfc4120_constants import (
36     AES256_CTS_HMAC_SHA1_96,
37     ARCFOUR_HMAC_MD5,
38     KRB_ERROR,
39     KRB_TGS_REP,
40     KDC_ERR_BADMATCH,
41     KDC_ERR_BADOPTION,
42     KDC_ERR_CLIENT_NAME_MISMATCH,
43     KDC_ERR_GENERIC,
44     KDC_ERR_MODIFIED,
45     KDC_ERR_POLICY,
46     KDC_ERR_S_PRINCIPAL_UNKNOWN,
47     KDC_ERR_TGT_REVOKED,
48     NT_PRINCIPAL,
49     NT_SRV_INST,
50 )
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
52
53 global_asn1_print = False
54 global_hexdump = False
55
56
57 class KdcTgsTests(KDCBaseTest):
58
59     def setUp(self):
60         super().setUp()
61         self.do_asn1_print = global_asn1_print
62         self.do_hexdump = global_hexdump
63
64     def test_tgs_req_cname_does_not_not_match_authenticator_cname(self):
65         ''' Try and obtain a ticket from the TGS, but supply a cname
66             that differs from that provided to the krbtgt
67         '''
68         # Create the user account
69         samdb = self.get_samdb()
70         user_name = "tsttktusr"
71         (uc, _) = self.create_account(samdb, user_name)
72         realm = uc.get_realm().lower()
73
74         # Do the initial AS-REQ, should get a pre-authentication required
75         # response
76         etype = (AES256_CTS_HMAC_SHA1_96,)
77         cname = self.PrincipalName_create(
78             name_type=NT_PRINCIPAL, names=[user_name])
79         sname = self.PrincipalName_create(
80             name_type=NT_SRV_INST, names=["krbtgt", realm])
81
82         rep = self.as_req(cname, sname, realm, etype)
83         self.check_pre_authentication(rep)
84
85         # Do the next AS-REQ
86         padata = self.get_enc_timestamp_pa_data(uc, rep)
87         key = self.get_as_rep_key(uc, rep)
88         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
89         self.check_as_reply(rep)
90
91         # Request a service ticket, but use a cname that does not match
92         # that in the original AS-REQ
93         enc_part2 = self.get_as_rep_enc_data(key, rep)
94         key = self.EncryptionKey_import(enc_part2['key'])
95         ticket = rep['ticket']
96
97         cname = self.PrincipalName_create(
98             name_type=NT_PRINCIPAL,
99             names=["Administrator"])
100         sname = self.PrincipalName_create(
101             name_type=NT_PRINCIPAL,
102             names=["host", samdb.host_dns_name()])
103
104         (rep, enc_part) = self.tgs_req(cname, sname, realm, ticket, key, etype,
105                                        expected_error_mode=KDC_ERR_BADMATCH,
106                                        expect_edata=False)
107
108         self.assertIsNone(
109             enc_part,
110             "rep = {%s}, enc_part = {%s}" % (rep, enc_part))
111         self.assertEqual(KRB_ERROR, rep['msg-type'], "rep = {%s}" % rep)
112         self.assertEqual(
113             KDC_ERR_BADMATCH,
114             rep['error-code'],
115             "rep = {%s}" % rep)
116
117     def test_ldap_service_ticket(self):
118         '''Get a ticket to the ldap service
119         '''
120         # Create the user account
121         samdb = self.get_samdb()
122         user_name = "tsttktusr"
123         (uc, _) = self.create_account(samdb, user_name)
124         realm = uc.get_realm().lower()
125
126         # Do the initial AS-REQ, should get a pre-authentication required
127         # response
128         etype = (AES256_CTS_HMAC_SHA1_96,)
129         cname = self.PrincipalName_create(
130             name_type=NT_PRINCIPAL, names=[user_name])
131         sname = self.PrincipalName_create(
132             name_type=NT_SRV_INST, names=["krbtgt", realm])
133
134         rep = self.as_req(cname, sname, realm, etype)
135         self.check_pre_authentication(rep)
136
137         # Do the next AS-REQ
138         padata = self.get_enc_timestamp_pa_data(uc, rep)
139         key = self.get_as_rep_key(uc, rep)
140         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
141         self.check_as_reply(rep)
142
143         enc_part2 = self.get_as_rep_enc_data(key, rep)
144         key = self.EncryptionKey_import(enc_part2['key'])
145         ticket = rep['ticket']
146
147         # Request a ticket to the ldap service
148         sname = self.PrincipalName_create(
149             name_type=NT_SRV_INST,
150             names=["ldap", samdb.host_dns_name()])
151
152         (rep, _) = self.tgs_req(
153             cname, sname, uc.get_realm(), ticket, key, etype,
154             service_creds=self.get_dc_creds())
155
156         self.check_tgs_reply(rep)
157
158     def test_get_ticket_for_host_service_of_machine_account(self):
159
160         # Create a user and machine account for the test.
161         #
162         samdb = self.get_samdb()
163         user_name = "tsttktusr"
164         (uc, dn) = self.create_account(samdb, user_name)
165         (mc, _) = self.create_account(samdb, "tsttktmac",
166                                       account_type=self.AccountType.COMPUTER)
167         realm = uc.get_realm().lower()
168
169         # Do the initial AS-REQ, should get a pre-authentication required
170         # response
171         etype = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
172         cname = self.PrincipalName_create(
173             name_type=NT_PRINCIPAL, names=[user_name])
174         sname = self.PrincipalName_create(
175             name_type=NT_SRV_INST, names=["krbtgt", realm])
176
177         rep = self.as_req(cname, sname, realm, etype)
178         self.check_pre_authentication(rep)
179
180         # Do the next AS-REQ
181         padata = self.get_enc_timestamp_pa_data(uc, rep)
182         key = self.get_as_rep_key(uc, rep)
183         rep = self.as_req(cname, sname, realm, etype, padata=[padata])
184         self.check_as_reply(rep)
185
186         # Request a ticket to the host service on the machine account
187         ticket = rep['ticket']
188         enc_part2 = self.get_as_rep_enc_data(key, rep)
189         key = self.EncryptionKey_import(enc_part2['key'])
190         cname = self.PrincipalName_create(
191             name_type=NT_PRINCIPAL,
192             names=[user_name])
193         sname = self.PrincipalName_create(
194             name_type=NT_PRINCIPAL,
195             names=[mc.get_username()])
196
197         (rep, enc_part) = self.tgs_req(
198             cname, sname, uc.get_realm(), ticket, key, etype,
199             service_creds=mc)
200         self.check_tgs_reply(rep)
201
202         # Check the contents of the service ticket
203         ticket = rep['ticket']
204         enc_part = self.decode_service_ticket(mc, ticket)
205
206         pac_data = self.get_pac_data(enc_part['authorization-data'])
207         sid = self.get_objectSid(samdb, dn)
208         upn = "%s@%s" % (uc.get_username(), realm)
209         self.assertEqual(
210             uc.get_username(),
211             str(pac_data.account_name),
212             "rep = {%s},%s" % (rep, pac_data))
213         self.assertEqual(
214             uc.get_username(),
215             pac_data.logon_name,
216             "rep = {%s},%s" % (rep, pac_data))
217         self.assertEqual(
218             uc.get_realm(),
219             pac_data.domain_name,
220             "rep = {%s},%s" % (rep, pac_data))
221         self.assertEqual(
222             upn,
223             pac_data.upn,
224             "rep = {%s},%s" % (rep, pac_data))
225         self.assertEqual(
226             sid,
227             pac_data.account_sid,
228             "rep = {%s},%s" % (rep, pac_data))
229
230     def _make_tgs_request(self, client_creds, service_creds, tgt,
231                           pac_request=None, expect_pac=True,
232                           expect_error=False,
233                           expected_account_name=None,
234                           expected_upn_name=None,
235                           expected_sid=None):
236         client_account = client_creds.get_username()
237         cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
238                                           names=[client_account])
239
240         service_account = service_creds.get_username()
241         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
242                                           names=[service_account])
243
244         realm = service_creds.get_realm()
245
246         expected_crealm = realm
247         expected_cname = cname
248         expected_srealm = realm
249         expected_sname = sname
250
251         expected_supported_etypes = service_creds.tgs_supported_enctypes
252
253         etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
254
255         kdc_options = str(krb5_asn1.KDCOptions('canonicalize'))
256
257         target_decryption_key = self.TicketDecryptionKey_from_creds(
258             service_creds)
259
260         authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
261
262         if expect_error:
263             expected_error_mode = KDC_ERR_BADOPTION
264             check_error_fn = self.generic_check_kdc_error
265             check_rep_fn = None
266         else:
267             expected_error_mode = 0
268             check_error_fn = None
269             check_rep_fn = self.generic_check_kdc_rep
270
271         kdc_exchange_dict = self.tgs_exchange_dict(
272             expected_crealm=expected_crealm,
273             expected_cname=expected_cname,
274             expected_srealm=expected_srealm,
275             expected_sname=expected_sname,
276             expected_account_name=expected_account_name,
277             expected_upn_name=expected_upn_name,
278             expected_sid=expected_sid,
279             expected_supported_etypes=expected_supported_etypes,
280             ticket_decryption_key=target_decryption_key,
281             check_error_fn=check_error_fn,
282             check_rep_fn=check_rep_fn,
283             check_kdc_private_fn=self.generic_check_kdc_private,
284             expected_error_mode=expected_error_mode,
285             tgt=tgt,
286             authenticator_subkey=authenticator_subkey,
287             kdc_options=kdc_options,
288             pac_request=pac_request,
289             expect_pac=expect_pac)
290
291         rep = self._generic_kdc_exchange(kdc_exchange_dict,
292                                          cname=cname,
293                                          realm=realm,
294                                          sname=sname,
295                                          etypes=etypes)
296         if expect_error:
297             self.check_error_rep(rep, expected_error_mode)
298
299             return None
300         else:
301             self.check_reply(rep, KRB_TGS_REP)
302
303             return kdc_exchange_dict['rep_ticket_creds']
304
305     def test_request(self):
306         client_creds = self.get_client_creds()
307         service_creds = self.get_service_creds()
308
309         tgt = self.get_tgt(client_creds)
310
311         pac = self.get_ticket_pac(tgt)
312         self.assertIsNotNone(pac)
313
314         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
315
316         pac = self.get_ticket_pac(ticket)
317         self.assertIsNotNone(pac)
318
319     def test_request_no_pac(self):
320         client_creds = self.get_client_creds()
321         service_creds = self.get_service_creds()
322
323         tgt = self.get_tgt(client_creds, pac_request=False)
324
325         pac = self.get_ticket_pac(tgt)
326         self.assertIsNotNone(pac)
327
328         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
329                                         pac_request=False, expect_pac=False)
330
331         pac = self.get_ticket_pac(ticket, expect_pac=False)
332         self.assertIsNone(pac)
333
334     def test_client_no_auth_data_required(self):
335         client_creds = self.get_cached_creds(
336             account_type=self.AccountType.USER,
337             opts={'no_auth_data_required': True})
338         service_creds = self.get_service_creds()
339
340         tgt = self.get_tgt(client_creds)
341
342         pac = self.get_ticket_pac(tgt)
343         self.assertIsNotNone(pac)
344
345         ticket = self._make_tgs_request(client_creds, service_creds, tgt)
346
347         pac = self.get_ticket_pac(ticket)
348         self.assertIsNotNone(pac)
349
350     def test_no_pac_client_no_auth_data_required(self):
351         client_creds = self.get_cached_creds(
352             account_type=self.AccountType.USER,
353             opts={'no_auth_data_required': True})
354         service_creds = self.get_service_creds()
355
356         tgt = self.get_tgt(client_creds)
357
358         pac = self.get_ticket_pac(tgt)
359         self.assertIsNotNone(pac)
360
361         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
362                                         pac_request=False, expect_pac=True)
363
364         pac = self.get_ticket_pac(ticket)
365         self.assertIsNotNone(pac)
366
367     def test_service_no_auth_data_required(self):
368         client_creds = self.get_client_creds()
369         service_creds = self.get_cached_creds(
370             account_type=self.AccountType.COMPUTER,
371             opts={'no_auth_data_required': True})
372
373         tgt = self.get_tgt(client_creds)
374
375         pac = self.get_ticket_pac(tgt)
376         self.assertIsNotNone(pac)
377
378         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
379                                         expect_pac=False)
380
381         pac = self.get_ticket_pac(ticket, expect_pac=False)
382         self.assertIsNone(pac)
383
384     def test_no_pac_service_no_auth_data_required(self):
385         client_creds = self.get_client_creds()
386         service_creds = self.get_cached_creds(
387             account_type=self.AccountType.COMPUTER,
388             opts={'no_auth_data_required': True})
389
390         tgt = self.get_tgt(client_creds, pac_request=False)
391
392         pac = self.get_ticket_pac(tgt)
393         self.assertIsNotNone(pac)
394
395         ticket = self._make_tgs_request(client_creds, service_creds, tgt,
396                                         pac_request=False, expect_pac=False)
397
398         pac = self.get_ticket_pac(ticket, expect_pac=False)
399         self.assertIsNone(pac)
400
401     def test_remove_pac_service_no_auth_data_required(self):
402         client_creds = self.get_client_creds()
403         service_creds = self.get_cached_creds(
404             account_type=self.AccountType.COMPUTER,
405             opts={'no_auth_data_required': True})
406
407         tgt = self.modified_ticket(self.get_tgt(client_creds),
408                                    exclude_pac=True)
409
410         pac = self.get_ticket_pac(tgt, expect_pac=False)
411         self.assertIsNone(pac)
412
413         self._make_tgs_request(client_creds, service_creds, tgt,
414                                expect_pac=False, expect_error=True)
415
416     def test_remove_pac_client_no_auth_data_required(self):
417         client_creds = self.get_cached_creds(
418             account_type=self.AccountType.USER,
419             opts={'no_auth_data_required': True})
420         service_creds = self.get_service_creds()
421
422         tgt = self.modified_ticket(self.get_tgt(client_creds),
423                                    exclude_pac=True)
424
425         pac = self.get_ticket_pac(tgt, expect_pac=False)
426         self.assertIsNone(pac)
427
428         self._make_tgs_request(client_creds, service_creds, tgt,
429                                expect_pac=False, expect_error=True)
430
431     def test_remove_pac(self):
432         client_creds = self.get_client_creds()
433         service_creds = self.get_service_creds()
434
435         tgt = self.modified_ticket(self.get_tgt(client_creds),
436                                    exclude_pac=True)
437
438         pac = self.get_ticket_pac(tgt, expect_pac=False)
439         self.assertIsNone(pac)
440
441         self._make_tgs_request(client_creds, service_creds, tgt,
442                                expect_pac=False, expect_error=True)
443
444     def test_upn_dns_info_ex_user(self):
445         client_creds = self.get_client_creds()
446         self._run_upn_dns_info_ex_test(client_creds)
447
448     def test_upn_dns_info_ex_mac(self):
449         mach_creds = self.get_mach_creds()
450         self._run_upn_dns_info_ex_test(mach_creds)
451
452     def test_upn_dns_info_ex_upn_user(self):
453         client_creds = self.get_cached_creds(
454             account_type=self.AccountType.USER,
455             opts={'upn': 'upn_dns_info_test_upn0@bar'})
456         self._run_upn_dns_info_ex_test(client_creds)
457
458     def test_upn_dns_info_ex_upn_mac(self):
459         mach_creds = self.get_cached_creds(
460             account_type=self.AccountType.COMPUTER,
461             opts={'upn': 'upn_dns_info_test_upn1@bar'})
462         self._run_upn_dns_info_ex_test(mach_creds)
463
464     def _run_upn_dns_info_ex_test(self, client_creds):
465         service_creds = self.get_service_creds()
466
467         samdb = self.get_samdb()
468         dn = client_creds.get_dn()
469
470         account_name = client_creds.get_username()
471         upn_name = client_creds.get_upn()
472         if upn_name is None:
473             realm = client_creds.get_realm().lower()
474             upn_name = f'{account_name}@{realm}'
475         sid = self.get_objectSid(samdb, dn)
476
477         tgt = self.get_tgt(client_creds,
478                            expected_account_name=account_name,
479                            expected_upn_name=upn_name,
480                            expected_sid=sid)
481
482         self._make_tgs_request(client_creds, service_creds, tgt,
483                                expected_account_name=account_name,
484                                expected_upn_name=upn_name,
485                                expected_sid=sid)
486
487     # Test making a TGS request.
488     def test_tgs_req(self):
489         creds = self._get_creds()
490         tgt = self._get_tgt(creds)
491         self._run_tgs(tgt, expected_error=0)
492
493     def test_renew_req(self):
494         creds = self._get_creds()
495         tgt = self._get_tgt(creds, renewable=True)
496         self._renew_tgt(tgt, expected_error=0)
497
498     def test_validate_req(self):
499         creds = self._get_creds()
500         tgt = self._get_tgt(creds, invalid=True)
501         self._validate_tgt(tgt, expected_error=0)
502
503     def test_s4u2self_req(self):
504         creds = self._get_creds()
505         tgt = self._get_tgt(creds)
506         self._s4u2self(tgt, creds, expected_error=0)
507
508     def test_user2user_req(self):
509         creds = self._get_creds()
510         tgt = self._get_tgt(creds)
511         self._user2user(tgt, creds, expected_error=0)
512
513     # Test making a request without a PAC.
514     def test_tgs_no_pac(self):
515         creds = self._get_creds()
516         tgt = self._get_tgt(creds, remove_pac=True)
517         self._run_tgs(tgt, expected_error=KDC_ERR_BADOPTION)
518
519     def test_renew_no_pac(self):
520         creds = self._get_creds()
521         tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
522         self._renew_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
523
524     def test_validate_no_pac(self):
525         creds = self._get_creds()
526         tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
527         self._validate_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
528
529     def test_s4u2self_no_pac(self):
530         creds = self._get_creds()
531         tgt = self._get_tgt(creds, remove_pac=True)
532         self._s4u2self(tgt, creds,
533                        expected_error=(KDC_ERR_GENERIC, KDC_ERR_BADOPTION),
534                        expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER,
535                        expect_edata=True)
536
537     def test_user2user_no_pac(self):
538         creds = self._get_creds()
539         tgt = self._get_tgt(creds, remove_pac=True)
540         self._user2user(tgt, creds, expected_error=KDC_ERR_BADOPTION)
541
542     # Test making a request with authdata and without a PAC.
543     def test_tgs_authdata_no_pac(self):
544         creds = self._get_creds()
545         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
546         self._run_tgs(tgt, expected_error=KDC_ERR_BADOPTION)
547
548     def test_renew_authdata_no_pac(self):
549         creds = self._get_creds()
550         tgt = self._get_tgt(creds, renewable=True, remove_pac=True,
551                             allow_empty_authdata=True)
552         self._renew_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
553
554     def test_validate_authdata_no_pac(self):
555         creds = self._get_creds()
556         tgt = self._get_tgt(creds, invalid=True, remove_pac=True,
557                             allow_empty_authdata=True)
558         self._validate_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
559
560     def test_s4u2self_authdata_no_pac(self):
561         creds = self._get_creds()
562         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
563         self._s4u2self(tgt, creds,
564                        expected_error=(KDC_ERR_GENERIC, KDC_ERR_BADOPTION),
565                        expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER,
566                        expect_edata=True)
567
568     def test_user2user_authdata_no_pac(self):
569         creds = self._get_creds()
570         tgt = self._get_tgt(creds, remove_pac=True, allow_empty_authdata=True)
571         self._user2user(tgt, creds, expected_error=KDC_ERR_BADOPTION)
572
573     # Test changing the SID in the PAC to that of another account.
574     def test_tgs_sid_mismatch_existing(self):
575         creds = self._get_creds()
576         existing_rid = self._get_existing_rid()
577         tgt = self._get_tgt(creds, new_rid=existing_rid)
578         self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
579
580     def test_renew_sid_mismatch_existing(self):
581         creds = self._get_creds()
582         existing_rid = self._get_existing_rid()
583         tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
584         self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
585
586     def test_validate_sid_mismatch_existing(self):
587         creds = self._get_creds()
588         existing_rid = self._get_existing_rid()
589         tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
590         self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
591
592     def test_s4u2self_sid_mismatch_existing(self):
593         creds = self._get_creds()
594         existing_rid = self._get_existing_rid()
595         tgt = self._get_tgt(creds, new_rid=existing_rid)
596         self._s4u2self(tgt, creds,
597                        expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
598
599     def test_user2user_sid_mismatch_existing(self):
600         creds = self._get_creds()
601         existing_rid = self._get_existing_rid()
602         tgt = self._get_tgt(creds, new_rid=existing_rid)
603         self._user2user(tgt, creds,
604                         expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
605
606     # Test changing the SID in the PAC to a non-existent one.
607     def test_tgs_sid_mismatch_nonexisting(self):
608         creds = self._get_creds()
609         nonexistent_rid = self._get_non_existent_rid()
610         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
611         self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
612
613     def test_renew_sid_mismatch_nonexisting(self):
614         creds = self._get_creds()
615         nonexistent_rid = self._get_non_existent_rid()
616         tgt = self._get_tgt(creds, renewable=True,
617                             new_rid=nonexistent_rid)
618         self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
619
620     def test_validate_sid_mismatch_nonexisting(self):
621         creds = self._get_creds()
622         nonexistent_rid = self._get_non_existent_rid()
623         tgt = self._get_tgt(creds, invalid=True,
624                             new_rid=nonexistent_rid)
625         self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
626
627     def test_s4u2self_sid_mismatch_nonexisting(self):
628         creds = self._get_creds()
629         nonexistent_rid = self._get_non_existent_rid()
630         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
631         self._s4u2self(tgt, creds,
632                        expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
633
634     def test_user2user_sid_mismatch_nonexisting(self):
635         creds = self._get_creds()
636         nonexistent_rid = self._get_non_existent_rid()
637         tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
638         self._user2user(tgt, creds,
639                         expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
640
641     # Test with an RODC-issued ticket where the client is revealed to the RODC.
642     def test_tgs_rodc_revealed(self):
643         creds = self._get_creds(replication_allowed=True,
644                                 revealed_to_rodc=True)
645         tgt = self._get_tgt(creds, from_rodc=True)
646         self._run_tgs(tgt, expected_error=0)
647
648     def test_renew_rodc_revealed(self):
649         creds = self._get_creds(replication_allowed=True,
650                                 revealed_to_rodc=True)
651         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
652         self._renew_tgt(tgt, expected_error=0)
653
654     def test_validate_rodc_revealed(self):
655         creds = self._get_creds(replication_allowed=True,
656                                 revealed_to_rodc=True)
657         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
658         self._validate_tgt(tgt, expected_error=0)
659
660     def test_s4u2self_rodc_revealed(self):
661         creds = self._get_creds(replication_allowed=True,
662                                 revealed_to_rodc=True)
663         tgt = self._get_tgt(creds, from_rodc=True)
664         self._s4u2self(tgt, creds, expected_error=0)
665
666     def test_user2user_rodc_revealed(self):
667         creds = self._get_creds(replication_allowed=True,
668                                 revealed_to_rodc=True)
669         tgt = self._get_tgt(creds, from_rodc=True)
670         self._user2user(tgt, creds, expected_error=0)
671
672     # Test with an RODC-issued ticket where the SID in the PAC is changed to
673     # that of another account.
674     def test_tgs_rodc_sid_mismatch_existing(self):
675         creds = self._get_creds(replication_allowed=True,
676                                 revealed_to_rodc=True)
677         existing_rid = self._get_existing_rid(replication_allowed=True,
678                                               revealed_to_rodc=True)
679         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
680         self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
681
682     def test_renew_rodc_sid_mismatch_existing(self):
683         creds = self._get_creds(replication_allowed=True,
684                                 revealed_to_rodc=True)
685         existing_rid = self._get_existing_rid(replication_allowed=True,
686                                               revealed_to_rodc=True)
687         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
688                             new_rid=existing_rid)
689         self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
690
691     def test_validate_rodc_sid_mismatch_existing(self):
692         creds = self._get_creds(replication_allowed=True,
693                                 revealed_to_rodc=True)
694         existing_rid = self._get_existing_rid(replication_allowed=True,
695                                        revealed_to_rodc=True)
696         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
697                             new_rid=existing_rid)
698         self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
699
700     def test_s4u2self_rodc_sid_mismatch_existing(self):
701         creds = self._get_creds(replication_allowed=True,
702                                 revealed_to_rodc=True)
703         existing_rid = self._get_existing_rid(replication_allowed=True,
704                                               revealed_to_rodc=True)
705         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
706         self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
707
708     def test_user2user_rodc_sid_mismatch_existing(self):
709         creds = self._get_creds(replication_allowed=True,
710                                 revealed_to_rodc=True)
711         existing_rid = self._get_existing_rid(replication_allowed=True,
712                                               revealed_to_rodc=True)
713         tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
714         self._user2user(tgt, creds,
715                         expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
716
717     # Test with an RODC-issued ticket where the SID in the PAC is changed to a
718     # non-existent one.
719     def test_tgs_rodc_sid_mismatch_nonexisting(self):
720         creds = self._get_creds(replication_allowed=True,
721                                 revealed_to_rodc=True)
722         nonexistent_rid = self._get_non_existent_rid()
723         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
724         self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
725
726     def test_renew_rodc_sid_mismatch_nonexisting(self):
727         creds = self._get_creds(replication_allowed=True,
728                                 revealed_to_rodc=True)
729         nonexistent_rid = self._get_non_existent_rid()
730         tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
731                             new_rid=nonexistent_rid)
732         self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
733
734     def test_validate_rodc_sid_mismatch_nonexisting(self):
735         creds = self._get_creds(replication_allowed=True,
736                                 revealed_to_rodc=True)
737         nonexistent_rid = self._get_non_existent_rid()
738         tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
739                             new_rid=nonexistent_rid)
740         self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
741
742     def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
743         creds = self._get_creds(replication_allowed=True,
744                                 revealed_to_rodc=True)
745         nonexistent_rid = self._get_non_existent_rid()
746         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
747         self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
748
749     def test_user2user_rodc_sid_mismatch_nonexisting(self):
750         creds = self._get_creds(replication_allowed=True,
751                                 revealed_to_rodc=True)
752         nonexistent_rid = self._get_non_existent_rid()
753         tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
754         self._user2user(tgt, creds,
755                         expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
756
757     # Test with an RODC-issued ticket where the client is not revealed to the
758     # RODC.
759     def test_tgs_rodc_not_revealed(self):
760         creds = self._get_creds(replication_allowed=True)
761         tgt = self._get_tgt(creds, from_rodc=True)
762         # TODO: error code
763         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
764
765     def test_renew_rodc_not_revealed(self):
766         creds = self._get_creds(replication_allowed=True)
767         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
768         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
769
770     def test_validate_rodc_not_revealed(self):
771         creds = self._get_creds(replication_allowed=True)
772         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
773         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
774
775     def test_s4u2self_rodc_not_revealed(self):
776         creds = self._get_creds(replication_allowed=True)
777         tgt = self._get_tgt(creds, from_rodc=True)
778         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
779
780     def test_user2user_rodc_not_revealed(self):
781         creds = self._get_creds(replication_allowed=True)
782         tgt = self._get_tgt(creds, from_rodc=True)
783         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
784
785     # Test with an RODC-issued ticket where the RODC account does not have the
786     # PARTIAL_SECRETS bit set.
787     def test_tgs_rodc_no_partial_secrets(self):
788         creds = self._get_creds(replication_allowed=True,
789                                 revealed_to_rodc=True)
790         tgt = self._get_tgt(creds, from_rodc=True)
791         self._remove_rodc_partial_secrets()
792         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
793
794     def test_renew_rodc_no_partial_secrets(self):
795         creds = self._get_creds(replication_allowed=True,
796                                 revealed_to_rodc=True)
797         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
798         self._remove_rodc_partial_secrets()
799         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
800
801     def test_validate_rodc_no_partial_secrets(self):
802         creds = self._get_creds(replication_allowed=True,
803                                 revealed_to_rodc=True)
804         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
805         self._remove_rodc_partial_secrets()
806         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
807
808     def test_s4u2self_rodc_no_partial_secrets(self):
809         creds = self._get_creds(replication_allowed=True,
810                                 revealed_to_rodc=True)
811         tgt = self._get_tgt(creds, from_rodc=True)
812         self._remove_rodc_partial_secrets()
813         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
814
815     def test_user2user_rodc_no_partial_secrets(self):
816         creds = self._get_creds(replication_allowed=True,
817                                 revealed_to_rodc=True)
818         tgt = self._get_tgt(creds, from_rodc=True)
819         self._remove_rodc_partial_secrets()
820         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
821
822     # Test with an RODC-issued ticket where the RODC account does not have an
823     # msDS-KrbTgtLink.
824     def test_tgs_rodc_no_krbtgt_link(self):
825         creds = self._get_creds(replication_allowed=True,
826                                 revealed_to_rodc=True)
827         tgt = self._get_tgt(creds, from_rodc=True)
828         self._remove_rodc_krbtgt_link()
829         self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
830
831     def test_renew_rodc_no_krbtgt_link(self):
832         creds = self._get_creds(replication_allowed=True,
833                                 revealed_to_rodc=True)
834         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
835         self._remove_rodc_krbtgt_link()
836         self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
837
838     def test_validate_rodc_no_krbtgt_link(self):
839         creds = self._get_creds(replication_allowed=True,
840                                 revealed_to_rodc=True)
841         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
842         self._remove_rodc_krbtgt_link()
843         self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
844
845     def test_s4u2self_rodc_no_krbtgt_link(self):
846         creds = self._get_creds(replication_allowed=True,
847                                 revealed_to_rodc=True)
848         tgt = self._get_tgt(creds, from_rodc=True)
849         self._remove_rodc_krbtgt_link()
850         self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
851
852     def test_user2user_rodc_no_krbtgt_link(self):
853         creds = self._get_creds(replication_allowed=True,
854                                 revealed_to_rodc=True)
855         tgt = self._get_tgt(creds, from_rodc=True)
856         self._remove_rodc_krbtgt_link()
857         self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
858
859     # Test with an RODC-issued ticket where the client is not allowed to
860     # replicate to the RODC.
861     def test_tgs_rodc_not_allowed(self):
862         creds = self._get_creds(revealed_to_rodc=True)
863         tgt = self._get_tgt(creds, from_rodc=True)
864         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
865
866     def test_renew_rodc_not_allowed(self):
867         creds = self._get_creds(revealed_to_rodc=True)
868         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
869         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
870
871     def test_validate_rodc_not_allowed(self):
872         creds = self._get_creds(revealed_to_rodc=True)
873         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
874         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
875
876     def test_s4u2self_rodc_not_allowed(self):
877         creds = self._get_creds(revealed_to_rodc=True)
878         tgt = self._get_tgt(creds, from_rodc=True)
879         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
880
881     def test_user2user_rodc_not_allowed(self):
882         creds = self._get_creds(revealed_to_rodc=True)
883         tgt = self._get_tgt(creds, from_rodc=True)
884         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
885
886     # Test with an RODC-issued ticket where the client is denied from
887     # replicating to the RODC.
888     def test_tgs_rodc_denied(self):
889         creds = self._get_creds(replication_denied=True,
890                                 revealed_to_rodc=True)
891         tgt = self._get_tgt(creds, from_rodc=True)
892         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
893
894     def test_renew_rodc_denied(self):
895         creds = self._get_creds(replication_denied=True,
896                                 revealed_to_rodc=True)
897         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
898         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
899
900     def test_validate_rodc_denied(self):
901         creds = self._get_creds(replication_denied=True,
902                                 revealed_to_rodc=True)
903         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
904         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
905
906     def test_s4u2self_rodc_denied(self):
907         creds = self._get_creds(replication_denied=True,
908                                 revealed_to_rodc=True)
909         tgt = self._get_tgt(creds, from_rodc=True)
910         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
911
912     def test_user2user_rodc_denied(self):
913         creds = self._get_creds(replication_denied=True,
914                                 revealed_to_rodc=True)
915         tgt = self._get_tgt(creds, from_rodc=True)
916         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
917
918     # Test with an RODC-issued ticket where the client is both allowed and
919     # denied replicating to the RODC.
920     def test_tgs_rodc_allowed_denied(self):
921         creds = self._get_creds(replication_allowed=True,
922                                 replication_denied=True,
923                                 revealed_to_rodc=True)
924         tgt = self._get_tgt(creds, from_rodc=True)
925         self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
926
927     def test_renew_rodc_allowed_denied(self):
928         creds = self._get_creds(replication_allowed=True,
929                                 replication_denied=True,
930                                 revealed_to_rodc=True)
931         tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
932         self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
933
934     def test_validate_rodc_allowed_denied(self):
935         creds = self._get_creds(replication_allowed=True,
936                                 replication_denied=True,
937                                 revealed_to_rodc=True)
938         tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
939         self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
940
941     def test_s4u2self_rodc_allowed_denied(self):
942         creds = self._get_creds(replication_allowed=True,
943                                 replication_denied=True,
944                                 revealed_to_rodc=True)
945         tgt = self._get_tgt(creds, from_rodc=True)
946         self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
947
948     def test_user2user_rodc_allowed_denied(self):
949         creds = self._get_creds(replication_allowed=True,
950                                 replication_denied=True,
951                                 revealed_to_rodc=True)
952         tgt = self._get_tgt(creds, from_rodc=True)
953         self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
954
955     # Test user-to-user with incorrect service principal names.
956     def test_user2user_matching_sname_host(self):
957         creds = self._get_creds()
958         tgt = self._get_tgt(creds)
959
960         user_name = creds.get_username()
961         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
962                                           names=['host', user_name])
963
964         self._user2user(tgt, creds, sname=sname,
965                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
966
967     def test_user2user_matching_sname_no_host(self):
968         creds = self._get_creds()
969         tgt = self._get_tgt(creds)
970
971         user_name = creds.get_username()
972         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
973                                           names=[user_name])
974
975         self._user2user(tgt, creds, sname=sname, expected_error=0)
976
977     def test_user2user_wrong_sname(self):
978         creds = self._get_creds()
979         tgt = self._get_tgt(creds)
980
981         other_creds = self._get_mach_creds()
982         user_name = other_creds.get_username()
983         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
984                                           names=[user_name])
985
986         self._user2user(tgt, creds, sname=sname,
987                         expected_error=(KDC_ERR_BADMATCH,
988                                         KDC_ERR_BADOPTION))
989
990     def test_user2user_non_existent_sname(self):
991         creds = self._get_creds()
992         tgt = self._get_tgt(creds)
993
994         sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
995                                           names=['host', 'non_existent_user'])
996
997         self._user2user(tgt, creds, sname=sname,
998                         expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
999
1000     def test_user2user_service_ticket(self):
1001         creds = self._get_creds()
1002         tgt = self._get_tgt(creds)
1003
1004         service_creds = self.get_service_creds()
1005         service_ticket = self.get_service_ticket(tgt, service_creds)
1006
1007         self._user2user(service_ticket, creds,
1008                         expected_error=(KDC_ERR_MODIFIED, KDC_ERR_POLICY))
1009
1010     def _get_tgt(self,
1011                  client_creds,
1012                  renewable=False,
1013                  invalid=False,
1014                  from_rodc=False,
1015                  new_rid=None,
1016                  remove_pac=False,
1017                  allow_empty_authdata=False):
1018         self.assertFalse(renewable and invalid)
1019
1020         if remove_pac:
1021             self.assertIsNone(new_rid)
1022
1023         tgt = self.get_tgt(client_creds)
1024
1025         if from_rodc:
1026             krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
1027         else:
1028             krbtgt_creds = self.get_krbtgt_creds()
1029
1030         if new_rid is not None:
1031             def change_sid_fn(pac):
1032                 for pac_buffer in pac.buffers:
1033                     if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
1034                         logon_info = pac_buffer.info.info
1035
1036                         logon_info.info3.base.rid = new_rid
1037
1038                 return pac
1039
1040             modify_pac_fn = change_sid_fn
1041         else:
1042             modify_pac_fn = None
1043
1044         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
1045
1046         if remove_pac:
1047             checksum_keys = None
1048         else:
1049             checksum_keys = {
1050                 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
1051             }
1052
1053         if renewable:
1054             def set_renewable(enc_part):
1055                 # Set the renewable flag.
1056                 renewable_flag = krb5_asn1.TicketFlags('renewable')
1057                 pos = len(tuple(renewable_flag)) - 1
1058
1059                 flags = enc_part['flags']
1060                 self.assertLessEqual(pos, len(flags))
1061
1062                 new_flags = flags[:pos] + '1' + flags[pos + 1:]
1063                 enc_part['flags'] = new_flags
1064
1065                 # Set the renew-till time to be in the future.
1066                 renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
1067                 enc_part['renew-till'] = renew_till
1068
1069                 return enc_part
1070
1071             modify_fn = set_renewable
1072         elif invalid:
1073             def set_invalid(enc_part):
1074                 # Set the invalid flag.
1075                 invalid_flag = krb5_asn1.TicketFlags('invalid')
1076                 pos = len(tuple(invalid_flag)) - 1
1077
1078                 flags = enc_part['flags']
1079                 self.assertLessEqual(pos, len(flags))
1080
1081                 new_flags = flags[:pos] + '1' + flags[pos + 1:]
1082                 enc_part['flags'] = new_flags
1083
1084                 # Set the ticket start time to be in the past.
1085                 past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
1086                 enc_part['starttime'] = past_time
1087
1088                 return enc_part
1089
1090             modify_fn = set_invalid
1091         else:
1092             modify_fn = None
1093
1094         return self.modified_ticket(
1095             tgt,
1096             new_ticket_key=krbtgt_key,
1097             modify_fn=modify_fn,
1098             modify_pac_fn=modify_pac_fn,
1099             exclude_pac=remove_pac,
1100             allow_empty_authdata=allow_empty_authdata,
1101             update_pac_checksums=not remove_pac,
1102             checksum_keys=checksum_keys)
1103
1104     def _remove_rodc_partial_secrets(self):
1105         samdb = self.get_samdb()
1106
1107         rodc_ctx = self.get_mock_rodc_ctx()
1108         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
1109
1110         def add_rodc_partial_secrets():
1111             msg = ldb.Message()
1112             msg.dn = rodc_dn
1113             msg['userAccountControl'] = ldb.MessageElement(
1114                 str(rodc_ctx.userAccountControl),
1115                 ldb.FLAG_MOD_REPLACE,
1116                 'userAccountControl')
1117             samdb.modify(msg)
1118
1119         self.addCleanup(add_rodc_partial_secrets)
1120
1121         uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
1122
1123         msg = ldb.Message()
1124         msg.dn = rodc_dn
1125         msg['userAccountControl'] = ldb.MessageElement(
1126             str(uac),
1127             ldb.FLAG_MOD_REPLACE,
1128             'userAccountControl')
1129         samdb.modify(msg)
1130
1131     def _remove_rodc_krbtgt_link(self):
1132         samdb = self.get_samdb()
1133
1134         rodc_ctx = self.get_mock_rodc_ctx()
1135         rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
1136
1137         def add_rodc_krbtgt_link():
1138             msg = ldb.Message()
1139             msg.dn = rodc_dn
1140             msg['msDS-KrbTgtLink'] = ldb.MessageElement(
1141                 rodc_ctx.new_krbtgt_dn,
1142                 ldb.FLAG_MOD_ADD,
1143                 'msDS-KrbTgtLink')
1144             samdb.modify(msg)
1145
1146         self.addCleanup(add_rodc_krbtgt_link)
1147
1148         msg = ldb.Message()
1149         msg.dn = rodc_dn
1150         msg['msDS-KrbTgtLink'] = ldb.MessageElement(
1151             [],
1152             ldb.FLAG_MOD_DELETE,
1153             'msDS-KrbTgtLink')
1154         samdb.modify(msg)
1155
1156     def _get_creds(self,
1157                    replication_allowed=False,
1158                    replication_denied=False,
1159                    revealed_to_rodc=False):
1160         return self.get_cached_creds(
1161             account_type=self.AccountType.COMPUTER,
1162             opts={
1163                 'allowed_replication_mock': replication_allowed,
1164                 'denied_replication_mock': replication_denied,
1165                 'revealed_to_mock_rodc': revealed_to_rodc,
1166                 'id': 0
1167             })
1168
1169     def _get_existing_rid(self,
1170                           replication_allowed=False,
1171                           replication_denied=False,
1172                           revealed_to_rodc=False):
1173         other_creds = self.get_cached_creds(
1174             account_type=self.AccountType.COMPUTER,
1175             opts={
1176                 'allowed_replication_mock': replication_allowed,
1177                 'denied_replication_mock': replication_denied,
1178                 'revealed_to_mock_rodc': revealed_to_rodc,
1179                 'id': 1
1180             })
1181
1182         samdb = self.get_samdb()
1183
1184         other_dn = other_creds.get_dn()
1185         other_sid = self.get_objectSid(samdb, other_dn)
1186
1187         other_rid = int(other_sid.rsplit('-', 1)[1])
1188
1189         return other_rid
1190
1191     def _get_mach_creds(self):
1192         return self.get_cached_creds(
1193             account_type=self.AccountType.COMPUTER,
1194             opts={
1195                 'allowed_replication_mock': True,
1196                 'denied_replication_mock': False,
1197                 'revealed_to_mock_rodc': True,
1198                 'id': 2
1199             })
1200
1201     def _get_non_existent_rid(self):
1202         return (1 << 30) - 1
1203
1204     def _run_tgs(self, tgt, expected_error):
1205         target_creds = self.get_service_creds()
1206         self._tgs_req(tgt, expected_error, target_creds)
1207
1208     def _renew_tgt(self, tgt, expected_error):
1209         krbtgt_creds = self.get_krbtgt_creds()
1210         kdc_options = str(krb5_asn1.KDCOptions('renew'))
1211         self._tgs_req(tgt, expected_error, krbtgt_creds,
1212                       kdc_options=kdc_options)
1213
1214     def _validate_tgt(self, tgt, expected_error):
1215         krbtgt_creds = self.get_krbtgt_creds()
1216         kdc_options = str(krb5_asn1.KDCOptions('validate'))
1217         self._tgs_req(tgt, expected_error, krbtgt_creds,
1218                       kdc_options=kdc_options)
1219
1220     def _s4u2self(self, tgt, tgt_creds, expected_error,
1221                   expect_edata=False, expected_status=None):
1222         user_creds = self._get_mach_creds()
1223
1224         user_name = user_creds.get_username()
1225         user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1226                                                names=[user_name])
1227         user_realm = user_creds.get_realm()
1228
1229         def generate_s4u2self_padata(_kdc_exchange_dict,
1230                                      _callback_dict,
1231                                      req_body):
1232             padata = self.PA_S4U2Self_create(
1233                 name=user_cname,
1234                 realm=user_realm,
1235                 tgt_session_key=tgt.session_key,
1236                 ctype=None)
1237
1238             return [padata], req_body
1239
1240         return self._tgs_req(tgt, expected_error, tgt_creds,
1241                              expected_cname=user_cname,
1242                              generate_padata_fn=generate_s4u2self_padata,
1243                              expect_claims=False, expect_edata=expect_edata,
1244                              expected_status=expected_status)
1245
1246     def _user2user(self, tgt, tgt_creds, expected_error, sname=None):
1247         user_creds = self._get_mach_creds()
1248         user_tgt = self.get_tgt(user_creds)
1249
1250         kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
1251         self._tgs_req(user_tgt, expected_error, tgt_creds,
1252                       kdc_options=kdc_options,
1253                       additional_ticket=tgt,
1254                       sname=sname)
1255
1256     def _tgs_req(self, tgt, expected_error, target_creds,
1257                  kdc_options='0',
1258                  expected_cname=None,
1259                  additional_ticket=None,
1260                  generate_padata_fn=None,
1261                  sname=None,
1262                  expect_claims=True,
1263                  expect_edata=False,
1264                  expected_status=None):
1265         srealm = target_creds.get_realm()
1266
1267         if sname is None:
1268             target_name = target_creds.get_username()
1269             if target_name == 'krbtgt':
1270                 sname = self.PrincipalName_create(name_type=NT_SRV_INST,
1271                                                   names=[target_name, srealm])
1272             else:
1273                 if target_name[-1] == '$':
1274                     target_name = target_name[:-1]
1275                 sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
1276                                                   names=['host', target_name])
1277
1278         if additional_ticket is not None:
1279             additional_tickets = [additional_ticket.ticket]
1280             decryption_key = additional_ticket.session_key
1281         else:
1282             additional_tickets = None
1283             decryption_key = self.TicketDecryptionKey_from_creds(
1284                 target_creds)
1285
1286         subkey = self.RandomKey(tgt.session_key.etype)
1287
1288         etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
1289
1290         if expected_error:
1291             check_error_fn = self.generic_check_kdc_error
1292             check_rep_fn = None
1293         else:
1294             check_error_fn = None
1295             check_rep_fn = self.generic_check_kdc_rep
1296
1297         if expected_cname is None:
1298             expected_cname = tgt.cname
1299
1300         kdc_exchange_dict = self.tgs_exchange_dict(
1301             expected_crealm=tgt.crealm,
1302             expected_cname=expected_cname,
1303             expected_srealm=srealm,
1304             expected_sname=sname,
1305             ticket_decryption_key=decryption_key,
1306             generate_padata_fn=generate_padata_fn,
1307             check_error_fn=check_error_fn,
1308             check_rep_fn=check_rep_fn,
1309             check_kdc_private_fn=self.generic_check_kdc_private,
1310             expected_error_mode=expected_error,
1311             expected_status=expected_status,
1312             tgt=tgt,
1313             authenticator_subkey=subkey,
1314             kdc_options=kdc_options,
1315             expect_edata=expect_edata,
1316             expect_claims=expect_claims)
1317
1318         rep = self._generic_kdc_exchange(kdc_exchange_dict,
1319                                          cname=None,
1320                                          realm=srealm,
1321                                          sname=sname,
1322                                          etypes=etypes,
1323                                          additional_tickets=additional_tickets)
1324         if expected_error:
1325             self.check_error_rep(rep, expected_error)
1326             return None
1327         else:
1328             self.check_reply(rep, KRB_TGS_REP)
1329             return kdc_exchange_dict['rep_ticket_creds']
1330
1331
1332 if __name__ == "__main__":
1333     global_asn1_print = False
1334     global_hexdump = False
1335     import unittest
1336     unittest.main()