1 # Integration tests for pycredentials
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from samba.tests import TestCase, delete_force
22 from samba.auth import system_session
23 from samba.credentials import Credentials, CLI_CRED_NTLMv2_AUTH
24 from samba.dcerpc import netlogon, ntlmssp
25 from samba.dcerpc.netlogon import netr_Authenticator, netr_WorkstationInformation
26 from samba.dcerpc.misc import SEC_CHAN_WKSTA
27 from samba.dsdb import (
28 UF_WORKSTATION_TRUST_ACCOUNT,
31 from samba.ndr import ndr_pack
32 from samba.samdb import SamDB
34 Integration tests for pycredentials
40 class PyCredentialsTests(TestCase):
43 super(PyCredentialsTests, self).setUp()
45 self.server = os.environ["SERVER"]
46 self.domain = os.environ["DOMAIN"]
47 self.host = os.environ["SERVER_IP"]
48 self.lp = self.get_loadparm()
50 self.credentials = self.get_credentials()
52 self.session = system_session()
53 self.ldb = SamDB(url="ldap://%s" % self.host,
54 session_info=self.session,
55 credentials=self.credentials,
58 self.create_machine_account()
59 self.create_user_account()
63 super(PyCredentialsTests, self).tearDown()
64 delete_force(self.ldb, self.machine_dn)
65 delete_force(self.ldb, self.user_dn)
67 # Until a successful netlogon connection has been established there will
68 # not be a valid authenticator associated with the credentials
69 # and new_client_authenticator should throw a ValueError
70 def test_no_netlogon_connection(self):
71 self.assertRaises(ValueError,
72 self.machine_creds.new_client_authenticator)
74 # Once a netlogon connection has been established,
75 # new_client_authenticator should return a value
77 def test_have_netlogon_connection(self):
78 c = self.get_netlogon_connection()
79 a = self.machine_creds.new_client_authenticator()
80 self.assertIsNotNone(a)
82 # Get an authenticator and use it on a sequence of operations requiring
84 def test_client_authenticator(self):
85 c = self.get_netlogon_connection()
86 (authenticator, subsequent) = self.get_authenticator(c)
87 self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
88 (authenticator, subsequent) = self.get_authenticator(c)
89 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
90 (authenticator, subsequent) = self.get_authenticator(c)
91 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
92 (authenticator, subsequent) = self.get_authenticator(c)
93 self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
95 # Test Credentials.encrypt_netr_crypt_password
96 # By performing a NetrServerPasswordSet2
97 # And the logging on using the new password.
98 def test_encrypt_netr_password(self):
100 self.do_Netr_ServerPasswordSet2()
101 # Now use the new password to perform an operation
102 self.do_DsrEnumerateDomainTrusts()
105 # Change the current machine account pazssword with a
106 # netr_ServerPasswordSet2 call.
108 def do_Netr_ServerPasswordSet2(self):
109 c = self.get_netlogon_connection()
110 (authenticator, subsequent) = self.get_authenticator(c)
113 newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
114 filler = [ord(x) for x in os.urandom(DATA_LEN-PWD_LEN)]
115 pwd = netlogon.netr_CryptPassword()
117 pwd.data = filler + [ord(x) for x in newpass]
118 self.machine_creds.encrypt_netr_crypt_password(pwd)
119 c.netr_ServerPasswordSet2(self.server,
120 self.machine_creds.get_workstation(),
126 self.machine_pass = newpass
127 self.machine_creds.set_password(newpass)
129 # Perform a DsrEnumerateDomainTrusts, this provides confirmation that
130 # a netlogon connection has been correctly established
131 def do_DsrEnumerateDomainTrusts(self):
132 c = self.get_netlogon_connection()
133 trusts = c.netr_DsrEnumerateDomainTrusts(
135 netlogon.NETR_TRUST_FLAG_IN_FOREST |
136 netlogon.NETR_TRUST_FLAG_OUTBOUND |
137 netlogon.NETR_TRUST_FLAG_INBOUND)
139 # Establish sealed schannel netlogon connection over TCP/IP
141 def get_netlogon_connection(self):
142 return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
147 # Create the machine account
148 def create_machine_account(self):
149 self.machine_pass = samba.generate_random_password(32, 32)
150 self.machine_name = MACHINE_NAME
151 self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
153 # remove the account if it exists, this will happen if a previous test
155 delete_force(self.ldb, self.machine_dn)
158 '"' + self.machine_pass.encode('utf-8') + '"', 'utf-8'
159 ).encode('utf-16-le')
161 "dn": self.machine_dn,
162 "objectclass": "computer",
163 "sAMAccountName": "%s$" % self.machine_name,
164 "userAccountControl":
165 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
166 "unicodePwd": utf16pw})
168 self.machine_creds = Credentials()
169 self.machine_creds.guess(self.get_loadparm())
170 self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
171 self.machine_creds.set_password(self.machine_pass)
172 self.machine_creds.set_username(self.machine_name + "$")
173 self.machine_creds.set_workstation(self.machine_name)
176 # Create a test user account
177 def create_user_account(self):
178 self.user_pass = samba.generate_random_password(32, 32)
179 self.user_name = USER_NAME
180 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
182 # remove the account if it exists, this will happen if a previous test
184 delete_force(self.ldb, self.user_dn)
187 '"' + self.user_pass.encode('utf-8') + '"', 'utf-8'
188 ).encode('utf-16-le')
191 "objectclass": "user",
192 "sAMAccountName": "%s" % self.user_name,
193 "userAccountControl": str(UF_NORMAL_ACCOUNT),
194 "unicodePwd": utf16pw})
196 self.user_creds = Credentials()
197 self.user_creds.guess(self.get_loadparm())
198 self.user_creds.set_password(self.user_pass)
199 self.user_creds.set_username(self.user_name)
200 self.user_creds.set_workstation(self.machine_name)
204 # Get the authenticator from the machine creds.
205 def get_authenticator(self, c):
206 auth = self.machine_creds.new_client_authenticator();
207 current = netr_Authenticator()
208 current.cred.data = [ord(x) for x in auth["credential"]]
209 current.timestamp = auth["timestamp"]
211 subsequent = netr_Authenticator()
212 return (current, subsequent)
214 def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
215 logon = samlogon_logon_info(self.domain,
219 logon_level = netlogon.NetlogonNetworkTransitiveInformation
220 validation_level = netlogon.NetlogonValidationSamInfo4
222 c.netr_LogonSamLogonWithFlags(self.server,
223 self.user_creds.get_workstation(),
231 def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
232 query = netr_WorkstationInformation()
234 c.netr_LogonGetDomainInfo(self.server,
235 self.user_creds.get_workstation(),
242 # Build the logon data required by NetrLogonSamLogonWithFlags
243 def samlogon_logon_info(domain_name, computer_name, creds):
245 target_info_blob = samlogon_target(domain_name, computer_name)
247 challenge = b"abcdefgh"
248 # User account under test
249 response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
251 target_info=target_info_blob)
253 logon = netlogon.netr_NetworkInfo()
255 logon.challenge = [ord(x) for x in challenge]
256 logon.nt = netlogon.netr_ChallengeResponse()
257 logon.nt.length = len(response["nt_response"])
258 logon.nt.data = [ord(x) for x in response["nt_response"]]
259 logon.identity_info = netlogon.netr_IdentityInfo()
261 (username, domain) = creds.get_ntlm_username_domain()
262 logon.identity_info.domain_name.string = domain
263 logon.identity_info.account_name.string = username
264 logon.identity_info.workstation.string = creds.get_workstation()
269 # Build the samlogon target info.
270 def samlogon_target(domain_name, computer_name):
271 target_info = ntlmssp.AV_PAIR_LIST()
272 target_info.count = 3
273 computername = ntlmssp.AV_PAIR()
274 computername.AvId = ntlmssp.MsvAvNbComputerName
275 computername.Value = computer_name
277 domainname = ntlmssp.AV_PAIR()
278 domainname.AvId = ntlmssp.MsvAvNbDomainName
279 domainname.Value = domain_name
281 eol = ntlmssp.AV_PAIR()
282 eol.AvId = ntlmssp.MsvAvEOL
283 target_info.pair = [domainname, computername, eol]
285 return ndr_pack(target_info)