1 # Integration tests for pycredentials
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.tests import TestCase, delete_force
22 from samba.auth import system_session
23 from samba.credentials import (
28 from samba.dcerpc import netlogon, ntlmssp, srvsvc
29 from samba.dcerpc.netlogon import (
31 netr_WorkstationInformation,
32 MSV1_0_ALLOW_MSVCHAPV2
34 from samba.dcerpc.misc import SEC_CHAN_WKSTA
35 from samba.dsdb import (
36 UF_WORKSTATION_TRUST_ACCOUNT,
39 from samba.ndr import ndr_pack
40 from samba.samdb import SamDB
41 from samba import NTSTATUSError, ntstatus
45 Integration tests for pycredentials
52 class PyCredentialsTests(TestCase):
55 super(PyCredentialsTests, self).setUp()
57 self.server = os.environ["SERVER"]
58 self.domain = os.environ["DOMAIN"]
59 self.host = os.environ["SERVER_IP"]
60 self.lp = self.get_loadparm()
62 self.credentials = self.get_credentials()
64 self.session = system_session()
65 self.ldb = SamDB(url="ldap://%s" % self.host,
66 session_info=self.session,
67 credentials=self.credentials,
70 self.create_machine_account()
71 self.create_user_account()
75 super(PyCredentialsTests, self).tearDown()
76 delete_force(self.ldb, self.machine_dn)
77 delete_force(self.ldb, self.user_dn)
79 # Until a successful netlogon connection has been established there will
80 # not be a valid authenticator associated with the credentials
81 # and new_client_authenticator should throw a ValueError
82 def test_no_netlogon_connection(self):
83 self.assertRaises(ValueError,
84 self.machine_creds.new_client_authenticator)
86 # Once a netlogon connection has been established,
87 # new_client_authenticator should return a value
89 def test_have_netlogon_connection(self):
90 c = self.get_netlogon_connection()
91 a = self.machine_creds.new_client_authenticator()
92 self.assertIsNotNone(a)
94 # Get an authenticator and use it on a sequence of operations requiring
96 def test_client_authenticator(self):
97 c = self.get_netlogon_connection()
98 (authenticator, subsequent) = self.get_authenticator(c)
99 self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
100 (authenticator, subsequent) = self.get_authenticator(c)
101 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
102 (authenticator, subsequent) = self.get_authenticator(c)
103 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
104 (authenticator, subsequent) = self.get_authenticator(c)
105 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
108 def test_SamLogonEx(self):
109 c = self.get_netlogon_connection()
111 logon = samlogon_logon_info(self.domain,
115 logon_level = netlogon.NetlogonNetworkTransitiveInformation
116 validation_level = netlogon.NetlogonValidationSamInfo4
120 c.netr_LogonSamLogonEx(self.server,
121 self.user_creds.get_workstation(),
126 except NTSTATUSError as e:
127 enum = ctypes.c_uint32(e.args[0]).value
128 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
129 self.fail("got wrong password error")
133 def test_SamLogonEx_no_domain(self):
134 c = self.get_netlogon_connection()
136 self.user_creds.set_domain('')
138 logon = samlogon_logon_info(self.domain,
142 logon_level = netlogon.NetlogonNetworkTransitiveInformation
143 validation_level = netlogon.NetlogonValidationSamInfo4
147 c.netr_LogonSamLogonEx(self.server,
148 self.user_creds.get_workstation(),
153 except NTSTATUSError as e:
154 enum = ctypes.c_uint32(e.args[0]).value
155 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
156 self.fail("got wrong password error")
158 self.fail("got unexpected error" + str(e))
160 def test_SamLogonExNTLM(self):
161 c = self.get_netlogon_connection()
163 logon = samlogon_logon_info(self.domain,
166 flags=CLI_CRED_NTLM_AUTH)
168 logon_level = netlogon.NetlogonNetworkTransitiveInformation
169 validation_level = netlogon.NetlogonValidationSamInfo4
173 c.netr_LogonSamLogonEx(self.server,
174 self.user_creds.get_workstation(),
179 except NTSTATUSError as e:
180 enum = ctypes.c_uint32(e.args[0]).value
181 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
182 self.fail("got wrong password error")
186 def test_SamLogonExMSCHAPv2(self):
187 c = self.get_netlogon_connection()
189 logon = samlogon_logon_info(self.domain,
192 flags=CLI_CRED_NTLM_AUTH)
194 logon.identity_info.parameter_control = MSV1_0_ALLOW_MSVCHAPV2
196 logon_level = netlogon.NetlogonNetworkTransitiveInformation
197 validation_level = netlogon.NetlogonValidationSamInfo4
201 c.netr_LogonSamLogonEx(self.server,
202 self.user_creds.get_workstation(),
207 except NTSTATUSError as e:
208 enum = ctypes.c_uint32(e.args[0]).value
209 if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
210 self.fail("got wrong password error")
215 # Test Credentials.encrypt_netr_crypt_password
216 # By performing a NetrServerPasswordSet2
217 # And the logging on using the new password.
220 def test_encrypt_netr_password(self):
221 # Change the password
222 self.do_Netr_ServerPasswordSet2()
223 # Now use the new password to perform an operation
224 srvsvc.srvsvc("ncacn_np:%s" % (self.server),
229 # Change the current machine account password with a
230 # netr_ServerPasswordSet2 call.
233 def do_Netr_ServerPasswordSet2(self):
234 c = self.get_netlogon_connection()
235 (authenticator, subsequent) = self.get_authenticator(c)
238 newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
239 encoded = newpass.encode('utf-16-le')
240 pwd_len = len(encoded)
241 filler = [ord(x) for x in os.urandom(DATA_LEN - pwd_len)]
242 pwd = netlogon.netr_CryptPassword()
244 pwd.data = filler + [ord(x) for x in encoded]
245 self.machine_creds.encrypt_netr_crypt_password(pwd)
246 c.netr_ServerPasswordSet2(self.server,
247 self.machine_creds.get_workstation(),
253 self.machine_pass = newpass
254 self.machine_creds.set_password(newpass)
256 # Establish sealed schannel netlogon connection over TCP/IP
258 def get_netlogon_connection(self):
259 return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
264 # Create the machine account
265 def create_machine_account(self):
266 self.machine_pass = samba.generate_random_password(32, 32)
267 self.machine_name = MACHINE_NAME
268 self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
270 # remove the account if it exists, this will happen if a previous test
272 delete_force(self.ldb, self.machine_dn)
275 '"' + self.machine_pass.encode('utf-8') + '"', 'utf-8'
276 ).encode('utf-16-le')
278 "dn": self.machine_dn,
279 "objectclass": "computer",
280 "sAMAccountName": "%s$" % self.machine_name,
281 "userAccountControl":
282 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
283 "unicodePwd": utf16pw})
285 self.machine_creds = Credentials()
286 self.machine_creds.guess(self.get_loadparm())
287 self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
288 self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
289 self.machine_creds.set_password(self.machine_pass)
290 self.machine_creds.set_username(self.machine_name + "$")
291 self.machine_creds.set_workstation(self.machine_name)
294 # Create a test user account
295 def create_user_account(self):
296 self.user_pass = samba.generate_random_password(32, 32)
297 self.user_name = USER_NAME
298 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
300 # remove the account if it exists, this will happen if a previous test
302 delete_force(self.ldb, self.user_dn)
305 '"' + self.user_pass.encode('utf-8') + '"', 'utf-8'
306 ).encode('utf-16-le')
309 "objectclass": "user",
310 "sAMAccountName": "%s" % self.user_name,
311 "userAccountControl": str(UF_NORMAL_ACCOUNT),
312 "unicodePwd": utf16pw})
314 self.user_creds = Credentials()
315 self.user_creds.guess(self.get_loadparm())
316 self.user_creds.set_password(self.user_pass)
317 self.user_creds.set_username(self.user_name)
318 self.user_creds.set_workstation(self.machine_name)
322 # Get the authenticator from the machine creds.
323 def get_authenticator(self, c):
324 auth = self.machine_creds.new_client_authenticator();
325 current = netr_Authenticator()
326 current.cred.data = [ord(x) for x in auth["credential"]]
327 current.timestamp = auth["timestamp"]
329 subsequent = netr_Authenticator()
330 return (current, subsequent)
332 def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
333 logon = samlogon_logon_info(self.domain,
337 logon_level = netlogon.NetlogonNetworkTransitiveInformation
338 validation_level = netlogon.NetlogonValidationSamInfo4
340 c.netr_LogonSamLogonWithFlags(self.server,
341 self.user_creds.get_workstation(),
349 def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
350 query = netr_WorkstationInformation()
352 c.netr_LogonGetDomainInfo(self.server,
353 self.user_creds.get_workstation(),
360 # Build the logon data required by NetrLogonSamLogonWithFlags
363 def samlogon_logon_info(domain_name, computer_name, creds,
364 flags=CLI_CRED_NTLMv2_AUTH):
366 target_info_blob = samlogon_target(domain_name, computer_name)
368 challenge = b"abcdefgh"
369 # User account under test
370 response = creds.get_ntlm_response(flags=flags,
372 target_info=target_info_blob)
374 logon = netlogon.netr_NetworkInfo()
376 logon.challenge = [ord(x) for x in challenge]
377 logon.nt = netlogon.netr_ChallengeResponse()
378 logon.nt.length = len(response["nt_response"])
379 logon.nt.data = [ord(x) for x in response["nt_response"]]
380 logon.identity_info = netlogon.netr_IdentityInfo()
382 (username, domain) = creds.get_ntlm_username_domain()
383 logon.identity_info.domain_name.string = domain
384 logon.identity_info.account_name.string = username
385 logon.identity_info.workstation.string = creds.get_workstation()
390 # Build the samlogon target info.
393 def samlogon_target(domain_name, computer_name):
394 target_info = ntlmssp.AV_PAIR_LIST()
395 target_info.count = 3
396 computername = ntlmssp.AV_PAIR()
397 computername.AvId = ntlmssp.MsvAvNbComputerName
398 computername.Value = computer_name
400 domainname = ntlmssp.AV_PAIR()
401 domainname.AvId = ntlmssp.MsvAvNbDomainName
402 domainname.Value = domain_name
404 eol = ntlmssp.AV_PAIR()
405 eol.AvId = ntlmssp.MsvAvEOL
406 target_info.pair = [domainname, computername, eol]
408 return ndr_pack(target_info)