1 # Integration tests for pycredentials
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.tests import TestCase, delete_force
24 from samba.auth import system_session
25 from samba.credentials import (
30 from samba.dcerpc import lsa, netlogon, ntlmssp, security, srvsvc
31 from samba.dcerpc.netlogon import (
33 netr_WorkstationInformation,
34 MSV1_0_ALLOW_MSVCHAPV2
36 from samba.dcerpc.misc import SEC_CHAN_WKSTA
37 from samba.dsdb import (
38 UF_WORKSTATION_TRUST_ACCOUNT,
41 from samba.ndr import ndr_pack, ndr_unpack
42 from samba.samdb import SamDB
43 from samba import NTSTATUSError, ntstatus
44 from samba.common import get_string
45 from samba.sd_utils import SDUtils
51 Integration tests for pycredentials
57 class PyCredentialsTests(TestCase):
62 self.server = os.environ["SERVER"]
63 self.domain = os.environ["DOMAIN"]
64 self.host = os.environ["SERVER_IP"]
65 self.lp = self.get_loadparm()
67 self.credentials = self.get_credentials()
69 self.session = system_session()
70 self.ldb = SamDB(url="ldap://%s" % self.host,
71 session_info=self.session,
72 credentials=self.credentials,
75 self.create_machine_account()
76 self.create_user_account()
80 delete_force(self.ldb, self.machine_dn)
81 delete_force(self.ldb, self.user_dn)
83 # Until a successful netlogon connection has been established there will
84 # not be a valid authenticator associated with the credentials
85 # and new_client_authenticator should throw a ValueError
86 def test_no_netlogon_connection(self):
87 self.assertRaises(ValueError,
88 self.machine_creds.new_client_authenticator)
90 # Once a netlogon connection has been established,
91 # new_client_authenticator should return a value
93 def test_have_netlogon_connection(self):
94 c = self.get_netlogon_connection()
95 a = self.machine_creds.new_client_authenticator()
96 self.assertIsNotNone(a)
98 # Get an authenticator and use it on a sequence of operations requiring
100 def test_client_authenticator(self):
101 c = self.get_netlogon_connection()
102 (authenticator, subsequent) = self.get_authenticator(c)
103 self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
104 (authenticator, subsequent) = self.get_authenticator(c)
105 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
106 (authenticator, subsequent) = self.get_authenticator(c)
107 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
108 (authenticator, subsequent) = self.get_authenticator(c)
109 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
111 # Test using LogonGetDomainInfo to update dNSHostName to an allowed value.
112 def test_set_dns_hostname_valid(self):
113 c = self.get_netlogon_connection()
114 authenticator, subsequent = self.get_authenticator(c)
116 domain_hostname = self.ldb.domain_dns_name()
118 new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
119 new_dns_hostname = new_dns_hostname.encode('utf-8')
121 query = netr_WorkstationInformation()
122 query.os_name = lsa.String('some OS')
123 query.dns_hostname = new_dns_hostname
125 c.netr_LogonGetDomainInfo(
126 server_name=self.server,
127 computer_name=self.user_creds.get_workstation(),
128 credential=authenticator,
129 return_authenticator=subsequent,
135 res = self.ldb.search(self.machine_dn,
136 scope=ldb.SCOPE_BASE,
137 attrs=['dNSHostName'])
138 self.assertEqual(1, len(res))
140 got_dns_hostname = res[0].get('dNSHostName', idx=0)
141 self.assertEqual(new_dns_hostname, got_dns_hostname)
143 # Test using LogonGetDomainInfo to update dNSHostName to an allowed value,
144 # when we are denied the right to do so.
145 def test_set_dns_hostname_valid_denied(self):
146 c = self.get_netlogon_connection()
147 authenticator, subsequent = self.get_authenticator(c)
149 res = self.ldb.search(self.machine_dn,
150 scope=ldb.SCOPE_BASE,
152 self.assertEqual(1, len(res))
154 machine_sid = ndr_unpack(security.dom_sid,
155 res[0].get('objectSid', idx=0))
157 sd_utils = SDUtils(self.ldb)
159 # Deny Validated Write and Write Property.
160 mod = (f'(OD;;SWWP;{security.GUID_DRS_DNS_HOST_NAME};;'
162 sd_utils.dacl_add_ace(self.machine_dn, mod)
164 domain_hostname = self.ldb.domain_dns_name()
166 new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
167 new_dns_hostname = new_dns_hostname.encode('utf-8')
169 query = netr_WorkstationInformation()
170 query.os_name = lsa.String('some OS')
171 query.dns_hostname = new_dns_hostname
173 c.netr_LogonGetDomainInfo(
174 server_name=self.server,
175 computer_name=self.user_creds.get_workstation(),
176 credential=authenticator,
177 return_authenticator=subsequent,
183 res = self.ldb.search(self.machine_dn,
184 scope=ldb.SCOPE_BASE,
185 attrs=['dNSHostName'])
186 self.assertEqual(1, len(res))
188 got_dns_hostname = res[0].get('dNSHostName', idx=0)
189 self.assertEqual(new_dns_hostname, got_dns_hostname)
191 # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
192 # invalid value, even with Validated Write.
193 def test_set_dns_hostname_invalid_validated_write(self):
194 c = self.get_netlogon_connection()
195 authenticator, subsequent = self.get_authenticator(c)
197 res = self.ldb.search(self.machine_dn,
198 scope=ldb.SCOPE_BASE,
200 self.assertEqual(1, len(res))
202 machine_sid = ndr_unpack(security.dom_sid,
203 res[0].get('objectSid', idx=0))
205 sd_utils = SDUtils(self.ldb)
207 # Grant Validated Write.
208 mod = (f'(OA;;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
210 sd_utils.dacl_add_ace(self.machine_dn, mod)
212 new_dns_hostname = b'invalid'
214 query = netr_WorkstationInformation()
215 query.os_name = lsa.String('some OS')
216 query.dns_hostname = new_dns_hostname
218 c.netr_LogonGetDomainInfo(
219 server_name=self.server,
220 computer_name=self.user_creds.get_workstation(),
221 credential=authenticator,
222 return_authenticator=subsequent,
228 res = self.ldb.search(self.machine_dn,
229 scope=ldb.SCOPE_BASE,
230 attrs=['dNSHostName'])
231 self.assertEqual(1, len(res))
233 got_dns_hostname = res[0].get('dNSHostName', idx=0)
234 self.assertIsNone(got_dns_hostname)
236 # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
237 # invalid value, even with Write Property.
238 def test_set_dns_hostname_invalid_write_property(self):
239 c = self.get_netlogon_connection()
240 authenticator, subsequent = self.get_authenticator(c)
242 res = self.ldb.search(self.machine_dn,
243 scope=ldb.SCOPE_BASE,
245 self.assertEqual(1, len(res))
247 machine_sid = ndr_unpack(security.dom_sid,
248 res[0].get('objectSid', idx=0))
250 sd_utils = SDUtils(self.ldb)
252 # Grant Write Property.
253 mod = (f'(OA;;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
255 sd_utils.dacl_add_ace(self.machine_dn, mod)
257 new_dns_hostname = b'invalid'
259 query = netr_WorkstationInformation()
260 query.os_name = lsa.String('some OS')
261 query.dns_hostname = new_dns_hostname
263 c.netr_LogonGetDomainInfo(
264 server_name=self.server,
265 computer_name=self.user_creds.get_workstation(),
266 credential=authenticator,
267 return_authenticator=subsequent,
273 res = self.ldb.search(self.machine_dn,
274 scope=ldb.SCOPE_BASE,
275 attrs=['dNSHostName'])
276 self.assertEqual(1, len(res))
278 got_dns_hostname = res[0].get('dNSHostName', idx=0)
279 self.assertIsNone(got_dns_hostname)
281 # Show we can't use LogonGetDomainInfo to set the dNSHostName to just the
283 def test_set_dns_hostname_to_machine_name(self):
284 c = self.get_netlogon_connection()
285 authenticator, subsequent = self.get_authenticator(c)
287 new_dns_hostname = self.machine_name.encode('utf-8')
289 query = netr_WorkstationInformation()
290 query.os_name = lsa.String('some OS')
291 query.dns_hostname = new_dns_hostname
293 c.netr_LogonGetDomainInfo(
294 server_name=self.server,
295 computer_name=self.user_creds.get_workstation(),
296 credential=authenticator,
297 return_authenticator=subsequent,
303 res = self.ldb.search(self.machine_dn,
304 scope=ldb.SCOPE_BASE,
305 attrs=['dNSHostName'])
306 self.assertEqual(1, len(res))
308 got_dns_hostname = res[0].get('dNSHostName', idx=0)
309 self.assertIsNone(got_dns_hostname)
311 # Show we can't use LogonGetDomainInfo to set dNSHostName with an invalid
313 def test_set_dns_hostname_invalid_suffix(self):
314 c = self.get_netlogon_connection()
315 authenticator, subsequent = self.get_authenticator(c)
317 domain_hostname = self.ldb.domain_dns_name()
319 new_dns_hostname = f'{self.machine_name}.foo.{domain_hostname}'
320 new_dns_hostname = new_dns_hostname.encode('utf-8')
322 query = netr_WorkstationInformation()
323 query.os_name = lsa.String('some OS')
324 query.dns_hostname = new_dns_hostname
326 c.netr_LogonGetDomainInfo(
327 server_name=self.server,
328 computer_name=self.user_creds.get_workstation(),
329 credential=authenticator,
330 return_authenticator=subsequent,
336 res = self.ldb.search(self.machine_dn,
337 scope=ldb.SCOPE_BASE,
338 attrs=['dNSHostName'])
339 self.assertEqual(1, len(res))
341 got_dns_hostname = res[0].get('dNSHostName', idx=0)
342 self.assertIsNone(got_dns_hostname)
344 # Test that setting the HANDLES_SPN_UPDATE flag inhibits the dNSHostName
345 # update, but other attributes are still updated.
346 def test_set_dns_hostname_with_flag(self):
347 c = self.get_netlogon_connection()
348 authenticator, subsequent = self.get_authenticator(c)
350 domain_hostname = self.ldb.domain_dns_name()
352 new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
353 new_dns_hostname = new_dns_hostname.encode('utf-8')
355 operating_system = 'some OS'
357 query = netr_WorkstationInformation()
358 query.os_name = lsa.String(operating_system)
360 query.dns_hostname = new_dns_hostname
361 query.workstation_flags = netlogon.NETR_WS_FLAG_HANDLES_SPN_UPDATE
363 c.netr_LogonGetDomainInfo(
364 server_name=self.server,
365 computer_name=self.user_creds.get_workstation(),
366 credential=authenticator,
367 return_authenticator=subsequent,
373 res = self.ldb.search(self.machine_dn,
374 scope=ldb.SCOPE_BASE,
375 attrs=['dNSHostName',
377 self.assertEqual(1, len(res))
379 got_dns_hostname = res[0].get('dNSHostName', idx=0)
380 self.assertIsNone(got_dns_hostname)
382 got_os = res[0].get('operatingSystem', idx=0)
383 self.assertEqual(operating_system.encode('utf-8'), got_os)
385 def test_SamLogonEx(self):
386 c = self.get_netlogon_connection()
388 logon = samlogon_logon_info(self.domain,
392 logon_level = netlogon.NetlogonNetworkTransitiveInformation
393 validation_level = netlogon.NetlogonValidationSamInfo4
397 c.netr_LogonSamLogonEx(self.server,
398 self.user_creds.get_workstation(),
403 except NTSTATUSError as e:
404 enum = ctypes.c_uint32(e.args[0]).value
405 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
406 self.fail("got wrong password error")
410 def test_SamLogonEx_no_domain(self):
411 c = self.get_netlogon_connection()
413 self.user_creds.set_domain('')
415 logon = samlogon_logon_info(self.domain,
419 logon_level = netlogon.NetlogonNetworkTransitiveInformation
420 validation_level = netlogon.NetlogonValidationSamInfo4
424 c.netr_LogonSamLogonEx(self.server,
425 self.user_creds.get_workstation(),
430 except NTSTATUSError as e:
431 enum = ctypes.c_uint32(e.args[0]).value
432 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
433 self.fail("got wrong password error")
435 self.fail("got unexpected error" + str(e))
437 def test_SamLogonExNTLM(self):
438 c = self.get_netlogon_connection()
440 logon = samlogon_logon_info(self.domain,
443 flags=CLI_CRED_NTLM_AUTH)
445 logon_level = netlogon.NetlogonNetworkTransitiveInformation
446 validation_level = netlogon.NetlogonValidationSamInfo4
450 c.netr_LogonSamLogonEx(self.server,
451 self.user_creds.get_workstation(),
456 except NTSTATUSError as e:
457 enum = ctypes.c_uint32(e.args[0]).value
458 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
459 self.fail("got wrong password error")
463 def test_SamLogonExMSCHAPv2(self):
464 c = self.get_netlogon_connection()
466 logon = samlogon_logon_info(self.domain,
469 flags=CLI_CRED_NTLM_AUTH)
471 logon.identity_info.parameter_control = MSV1_0_ALLOW_MSVCHAPV2
473 logon_level = netlogon.NetlogonNetworkTransitiveInformation
474 validation_level = netlogon.NetlogonValidationSamInfo4
478 c.netr_LogonSamLogonEx(self.server,
479 self.user_creds.get_workstation(),
484 except NTSTATUSError as e:
485 enum = ctypes.c_uint32(e.args[0]).value
486 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
487 self.fail("got wrong password error")
491 # Test Credentials.encrypt_netr_crypt_password
492 # By performing a NetrServerPasswordSet2
493 # And the logging on using the new password.
495 def test_encrypt_netr_password(self):
496 # Change the password
497 self.do_Netr_ServerPasswordSet2()
498 # Now use the new password to perform an operation
499 srvsvc.srvsvc("ncacn_np:%s" % (self.server),
503 # Change the current machine account password with a
504 # netr_ServerPasswordSet2 call.
506 def do_Netr_ServerPasswordSet2(self):
507 c = self.get_netlogon_connection()
508 (authenticator, subsequent) = self.get_authenticator(c)
511 newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
512 encoded = newpass.encode('utf-16-le')
513 pwd_len = len(encoded)
514 filler = [x if isinstance(x, int) else ord(x) for x in os.urandom(DATA_LEN - pwd_len)]
515 pwd = netlogon.netr_CryptPassword()
517 pwd.data = filler + [x if isinstance(x, int) else ord(x) for x in encoded]
518 self.machine_creds.encrypt_netr_crypt_password(pwd)
519 c.netr_ServerPasswordSet2(self.server,
520 self.machine_creds.get_workstation(),
526 self.machine_pass = newpass
527 self.machine_creds.set_password(newpass)
529 # Establish sealed schannel netlogon connection over TCP/IP
531 def get_netlogon_connection(self):
532 return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
537 # Create the machine account
538 def create_machine_account(self):
539 self.machine_pass = samba.generate_random_password(32, 32)
540 self.machine_name = MACHINE_NAME
541 self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
543 # remove the account if it exists, this will happen if a previous test
545 delete_force(self.ldb, self.machine_dn)
547 utf16pw = ('"%s"' % get_string(self.machine_pass)).encode('utf-16-le')
549 "dn": self.machine_dn,
550 "objectclass": "computer",
551 "sAMAccountName": "%s$" % self.machine_name,
552 "userAccountControl":
553 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
554 "unicodePwd": utf16pw})
556 self.machine_creds = Credentials()
557 self.machine_creds.guess(self.get_loadparm())
558 self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
559 self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
560 self.machine_creds.set_password(self.machine_pass)
561 self.machine_creds.set_username(self.machine_name + "$")
562 self.machine_creds.set_workstation(self.machine_name)
565 # Create a test user account
566 def create_user_account(self):
567 self.user_pass = samba.generate_random_password(32, 32)
568 self.user_name = USER_NAME
569 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
571 # remove the account if it exists, this will happen if a previous test
573 delete_force(self.ldb, self.user_dn)
575 utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
578 "objectclass": "user",
579 "sAMAccountName": "%s" % self.user_name,
580 "userAccountControl": str(UF_NORMAL_ACCOUNT),
581 "unicodePwd": utf16pw})
583 self.user_creds = Credentials()
584 self.user_creds.guess(self.get_loadparm())
585 self.user_creds.set_password(self.user_pass)
586 self.user_creds.set_username(self.user_name)
587 self.user_creds.set_workstation(self.machine_name)
591 # Get the authenticator from the machine creds.
592 def get_authenticator(self, c):
593 auth = self.machine_creds.new_client_authenticator()
594 current = netr_Authenticator()
595 current.cred.data = [x if isinstance(x, int) else ord(x) for x in auth["credential"]]
596 current.timestamp = auth["timestamp"]
598 subsequent = netr_Authenticator()
599 return (current, subsequent)
601 def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
602 logon = samlogon_logon_info(self.domain,
606 logon_level = netlogon.NetlogonNetworkTransitiveInformation
607 validation_level = netlogon.NetlogonValidationSamInfo4
609 c.netr_LogonSamLogonWithFlags(self.server,
610 self.user_creds.get_workstation(),
618 def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
619 query = netr_WorkstationInformation()
621 c.netr_LogonGetDomainInfo(self.server,
622 self.user_creds.get_workstation(),
629 # Build the logon data required by NetrLogonSamLogonWithFlags
632 def samlogon_logon_info(domain_name, computer_name, creds,
633 flags=CLI_CRED_NTLMv2_AUTH):
635 target_info_blob = samlogon_target(domain_name, computer_name)
637 challenge = b"abcdefgh"
638 # User account under test
639 response = creds.get_ntlm_response(flags=flags,
641 target_info=target_info_blob)
643 logon = netlogon.netr_NetworkInfo()
645 logon.challenge = [x if isinstance(x, int) else ord(x) for x in challenge]
646 logon.nt = netlogon.netr_ChallengeResponse()
647 logon.nt.length = len(response["nt_response"])
648 logon.nt.data = [x if isinstance(x, int) else ord(x) for x in response["nt_response"]]
649 logon.identity_info = netlogon.netr_IdentityInfo()
651 (username, domain) = creds.get_ntlm_username_domain()
652 logon.identity_info.domain_name.string = domain
653 logon.identity_info.account_name.string = username
654 logon.identity_info.workstation.string = creds.get_workstation()
659 # Build the samlogon target info.
662 def samlogon_target(domain_name, computer_name):
663 target_info = ntlmssp.AV_PAIR_LIST()
664 target_info.count = 3
665 computername = ntlmssp.AV_PAIR()
666 computername.AvId = ntlmssp.MsvAvNbComputerName
667 computername.Value = computer_name
669 domainname = ntlmssp.AV_PAIR()
670 domainname.AvId = ntlmssp.MsvAvNbDomainName
671 domainname.Value = domain_name
673 eol = ntlmssp.AV_PAIR()
674 eol.AvId = ntlmssp.MsvAvEOL
675 target_info.pair = [domainname, computername, eol]
677 return ndr_pack(target_info)