c2091e498257bc651c584a4643a637a9ea9da098
[bbaumbach/samba-autobuild/.git] / python / samba / tests / py_credentials.py
1 # Integration tests for pycredentials
2 #
3 # Copyright (C) Catalyst IT Ltd. 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 from samba.tests import TestCase, delete_force
19 import os
20
21 import samba
22 from samba.auth import system_session
23 from samba.credentials import (
24     Credentials,
25     CLI_CRED_NTLMv2_AUTH,
26     CLI_CRED_NTLM_AUTH,
27     DONT_USE_KERBEROS)
28 from samba.dcerpc import netlogon, ntlmssp, srvsvc
29 from samba.dcerpc.netlogon import (
30     netr_Authenticator,
31     netr_WorkstationInformation,
32     MSV1_0_ALLOW_MSVCHAPV2
33 )
34 from samba.dcerpc.misc import SEC_CHAN_WKSTA
35 from samba.dsdb import (
36     UF_WORKSTATION_TRUST_ACCOUNT,
37     UF_PASSWD_NOTREQD,
38     UF_NORMAL_ACCOUNT)
39 from samba.ndr import ndr_pack
40 from samba.samdb import SamDB
41 from samba import NTSTATUSError, ntstatus
42 import ctypes
43
44 """
45 Integration tests for pycredentials
46 """
47
48 MACHINE_NAME = "PCTM"
49 USER_NAME    = "PCTU"
50
51 class PyCredentialsTests(TestCase):
52
53     def setUp(self):
54         super(PyCredentialsTests, self).setUp()
55
56         self.server      = os.environ["SERVER"]
57         self.domain      = os.environ["DOMAIN"]
58         self.host        = os.environ["SERVER_IP"]
59         self.lp          = self.get_loadparm()
60
61         self.credentials = self.get_credentials()
62
63         self.session     = system_session()
64         self.ldb = SamDB(url="ldap://%s" % self.host,
65                          session_info=self.session,
66                          credentials=self.credentials,
67                          lp=self.lp)
68
69         self.create_machine_account()
70         self.create_user_account()
71
72
73     def tearDown(self):
74         super(PyCredentialsTests, self).tearDown()
75         delete_force(self.ldb, self.machine_dn)
76         delete_force(self.ldb, self.user_dn)
77
78     # Until a successful netlogon connection has been established there will
79     # not be a valid authenticator associated with the credentials
80     # and new_client_authenticator should throw a ValueError
81     def test_no_netlogon_connection(self):
82         self.assertRaises(ValueError,
83                           self.machine_creds.new_client_authenticator)
84
85     # Once a netlogon connection has been established,
86     # new_client_authenticator should return a value
87     #
88     def test_have_netlogon_connection(self):
89         c = self.get_netlogon_connection()
90         a = self.machine_creds.new_client_authenticator()
91         self.assertIsNotNone(a)
92
93     # Get an authenticator and use it on a sequence of operations requiring
94     # an authenticator
95     def test_client_authenticator(self):
96         c = self.get_netlogon_connection()
97         (authenticator, subsequent) = self.get_authenticator(c)
98         self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
99         (authenticator, subsequent) = self.get_authenticator(c)
100         self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
101         (authenticator, subsequent) = self.get_authenticator(c)
102         self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
103         (authenticator, subsequent) = self.get_authenticator(c)
104         self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
105
106
107     def test_SamLogonEx(self):
108         c = self.get_netlogon_connection()
109
110         logon = samlogon_logon_info(self.domain,
111                                     self.machine_name,
112                                     self.user_creds)
113
114         logon_level = netlogon.NetlogonNetworkTransitiveInformation
115         validation_level = netlogon.NetlogonValidationSamInfo4
116         netr_flags = 0
117
118         try:
119             c.netr_LogonSamLogonEx(self.server,
120                                    self.user_creds.get_workstation(),
121                                    logon_level,
122                                    logon,
123                                    validation_level,
124                                    netr_flags)
125         except NTSTATUSError as e:
126             enum = ctypes.c_uint32(e.args[0]).value
127             if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
128                 self.fail("got wrong password error")
129             else:
130                 raise
131
132     def test_SamLogonEx_no_domain(self):
133         c = self.get_netlogon_connection()
134
135         self.user_creds.set_domain('')
136
137         logon = samlogon_logon_info(self.domain,
138                                     self.machine_name,
139                                     self.user_creds)
140
141         logon_level = netlogon.NetlogonNetworkTransitiveInformation
142         validation_level = netlogon.NetlogonValidationSamInfo4
143         netr_flags = 0
144
145         try:
146             c.netr_LogonSamLogonEx(self.server,
147                                    self.user_creds.get_workstation(),
148                                    logon_level,
149                                    logon,
150                                    validation_level,
151                                    netr_flags)
152         except NTSTATUSError as e:
153             enum = ctypes.c_uint32(e.args[0]).value
154             if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
155                 self.fail("got wrong password error")
156             else:
157                 self.fail("got unexpected error" + str(e))
158
159     def test_SamLogonExNTLM(self):
160         c = self.get_netlogon_connection()
161
162         logon = samlogon_logon_info(self.domain,
163                                     self.machine_name,
164                                     self.user_creds,
165                                     flags=CLI_CRED_NTLM_AUTH)
166
167         logon_level = netlogon.NetlogonNetworkTransitiveInformation
168         validation_level = netlogon.NetlogonValidationSamInfo4
169         netr_flags = 0
170
171         try:
172             c.netr_LogonSamLogonEx(self.server,
173                                    self.user_creds.get_workstation(),
174                                    logon_level,
175                                    logon,
176                                    validation_level,
177                                    netr_flags)
178         except NTSTATUSError as e:
179             enum = ctypes.c_uint32(e.args[0]).value
180             if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
181                 self.fail("got wrong password error")
182             else:
183                 raise
184
185     def test_SamLogonExMSCHAPv2(self):
186         c = self.get_netlogon_connection()
187
188         logon = samlogon_logon_info(self.domain,
189                                     self.machine_name,
190                                     self.user_creds,
191                                     flags=CLI_CRED_NTLM_AUTH)
192
193         logon.identity_info.parameter_control = MSV1_0_ALLOW_MSVCHAPV2
194
195         logon_level = netlogon.NetlogonNetworkTransitiveInformation
196         validation_level = netlogon.NetlogonValidationSamInfo4
197         netr_flags = 0
198
199         try:
200             c.netr_LogonSamLogonEx(self.server,
201                                    self.user_creds.get_workstation(),
202                                    logon_level,
203                                    logon,
204                                    validation_level,
205                                    netr_flags)
206         except NTSTATUSError as e:
207             enum = ctypes.c_uint32(e.args[0]).value
208             if enum == ntstatus.NT_STATUS_WRONG_PASSWORD:
209                 self.fail("got wrong password error")
210             else:
211                 raise
212
213
214     # Test Credentials.encrypt_netr_crypt_password
215     # By performing a NetrServerPasswordSet2
216     # And the logging on using the new password.
217
218
219     def test_encrypt_netr_password(self):
220         # Change the password
221         self.do_Netr_ServerPasswordSet2()
222         # Now use the new password to perform an operation
223         srvsvc.srvsvc("ncacn_np:%s" % (self.server),
224                       self.lp,
225                       self.machine_creds)
226
227
228    # Change the current machine account password with a
229    # netr_ServerPasswordSet2 call.
230
231
232     def do_Netr_ServerPasswordSet2(self):
233         c = self.get_netlogon_connection()
234         (authenticator, subsequent) = self.get_authenticator(c)
235         PWD_LEN  = 32
236         DATA_LEN = 512
237         newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
238         encoded = newpass.encode('utf-16-le')
239         pwd_len = len(encoded)
240         filler  = [ord(x) for x in os.urandom(DATA_LEN - pwd_len)]
241         pwd = netlogon.netr_CryptPassword()
242         pwd.length = pwd_len
243         pwd.data = filler + [ord(x) for x in encoded]
244         self.machine_creds.encrypt_netr_crypt_password(pwd)
245         c.netr_ServerPasswordSet2(self.server,
246                                   self.machine_creds.get_workstation(),
247                                   SEC_CHAN_WKSTA,
248                                   self.machine_name,
249                                   authenticator,
250                                   pwd)
251
252         self.machine_pass = newpass
253         self.machine_creds.set_password(newpass)
254
255     # Establish sealed schannel netlogon connection over TCP/IP
256     #
257     def get_netlogon_connection(self):
258         return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
259                                  self.lp,
260                                  self.machine_creds)
261
262     #
263     # Create the machine account
264     def create_machine_account(self):
265         self.machine_pass = samba.generate_random_password(32, 32)
266         self.machine_name = MACHINE_NAME
267         self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
268
269         # remove the account if it exists, this will happen if a previous test
270         # run failed
271         delete_force(self.ldb, self.machine_dn)
272
273         utf16pw = unicode(
274             '"' + self.machine_pass.encode('utf-8') + '"', 'utf-8'
275         ).encode('utf-16-le')
276         self.ldb.add({
277             "dn": self.machine_dn,
278             "objectclass": "computer",
279             "sAMAccountName": "%s$" % self.machine_name,
280             "userAccountControl":
281                 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
282             "unicodePwd": utf16pw})
283
284         self.machine_creds = Credentials()
285         self.machine_creds.guess(self.get_loadparm())
286         self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
287         self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
288         self.machine_creds.set_password(self.machine_pass)
289         self.machine_creds.set_username(self.machine_name + "$")
290         self.machine_creds.set_workstation(self.machine_name)
291
292     #
293     # Create a test user account
294     def create_user_account(self):
295         self.user_pass = samba.generate_random_password(32, 32)
296         self.user_name = USER_NAME
297         self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
298
299         # remove the account if it exists, this will happen if a previous test
300         # run failed
301         delete_force(self.ldb, self.user_dn)
302
303         utf16pw = unicode(
304             '"' + self.user_pass.encode('utf-8') + '"', 'utf-8'
305         ).encode('utf-16-le')
306         self.ldb.add({
307             "dn": self.user_dn,
308             "objectclass": "user",
309             "sAMAccountName": "%s" % self.user_name,
310             "userAccountControl": str(UF_NORMAL_ACCOUNT),
311             "unicodePwd": utf16pw})
312
313         self.user_creds = Credentials()
314         self.user_creds.guess(self.get_loadparm())
315         self.user_creds.set_password(self.user_pass)
316         self.user_creds.set_username(self.user_name)
317         self.user_creds.set_workstation(self.machine_name)
318         pass
319
320     #
321     # Get the authenticator from the machine creds.
322     def get_authenticator(self, c):
323         auth = self.machine_creds.new_client_authenticator();
324         current  = netr_Authenticator()
325         current.cred.data = [ord(x) for x in auth["credential"]]
326         current.timestamp = auth["timestamp"]
327
328         subsequent = netr_Authenticator()
329         return (current, subsequent)
330
331     def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
332         logon = samlogon_logon_info(self.domain,
333                                     self.machine_name,
334                                     self.user_creds)
335
336         logon_level = netlogon.NetlogonNetworkTransitiveInformation
337         validation_level = netlogon.NetlogonValidationSamInfo4
338         netr_flags = 0
339         c.netr_LogonSamLogonWithFlags(self.server,
340                                       self.user_creds.get_workstation(),
341                                       current,
342                                       subsequent,
343                                       logon_level,
344                                       logon,
345                                       validation_level,
346                                       netr_flags)
347
348     def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
349         query = netr_WorkstationInformation()
350
351         c.netr_LogonGetDomainInfo(self.server,
352                                   self.user_creds.get_workstation(),
353                                   current,
354                                   subsequent,
355                                   2,
356                                   query)
357
358 #
359 # Build the logon data required by NetrLogonSamLogonWithFlags
360 def samlogon_logon_info(domain_name, computer_name, creds,
361                         flags=CLI_CRED_NTLMv2_AUTH):
362
363     target_info_blob = samlogon_target(domain_name, computer_name)
364
365     challenge = b"abcdefgh"
366     # User account under test
367     response = creds.get_ntlm_response(flags=flags,
368                                        challenge=challenge,
369                                        target_info=target_info_blob)
370
371     logon = netlogon.netr_NetworkInfo()
372
373     logon.challenge     = [ord(x) for x in challenge]
374     logon.nt            = netlogon.netr_ChallengeResponse()
375     logon.nt.length     = len(response["nt_response"])
376     logon.nt.data       = [ord(x) for x in response["nt_response"]]
377     logon.identity_info = netlogon.netr_IdentityInfo()
378
379     (username, domain)  = creds.get_ntlm_username_domain()
380     logon.identity_info.domain_name.string  = domain
381     logon.identity_info.account_name.string = username
382     logon.identity_info.workstation.string  = creds.get_workstation()
383
384     return logon
385
386 #
387 # Build the samlogon target info.
388 def samlogon_target(domain_name, computer_name):
389     target_info = ntlmssp.AV_PAIR_LIST()
390     target_info.count = 3
391     computername = ntlmssp.AV_PAIR()
392     computername.AvId = ntlmssp.MsvAvNbComputerName
393     computername.Value = computer_name
394
395     domainname = ntlmssp.AV_PAIR()
396     domainname.AvId = ntlmssp.MsvAvNbDomainName
397     domainname.Value = domain_name
398
399     eol = ntlmssp.AV_PAIR()
400     eol.AvId = ntlmssp.MsvAvEOL
401     target_info.pair = [domainname, computername, eol]
402
403     return ndr_pack(target_info)