b47cf9ebc45c4dda47a72289307056200b465ddc
[metze/samba/wip.git] / python / samba / tests / py_credentials.py
1 # Integration tests for pycredentials
2 #
3 # Copyright (C) Catalyst IT Ltd. 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 from samba.tests import TestCase, delete_force
19 import os
20
21 import samba
22 from samba.auth import system_session
23 from samba.credentials import Credentials, CLI_CRED_NTLMv2_AUTH
24 from samba.dcerpc import netlogon, ntlmssp
25 from samba.dcerpc.netlogon import netr_Authenticator, netr_WorkstationInformation
26 from samba.dcerpc.misc import SEC_CHAN_WKSTA
27 from samba.dsdb import (
28     UF_WORKSTATION_TRUST_ACCOUNT,
29     UF_PASSWD_NOTREQD,
30     UF_NORMAL_ACCOUNT)
31 from samba.ndr import ndr_pack
32 from samba.samdb import SamDB
33 """
34 Integration tests for pycredentials
35 """
36
37 MACHINE_NAME = "PCTM"
38 USER_NAME    = "PCTU"
39
40 class PyCredentialsTests(TestCase):
41
42     def setUp(self):
43         super(PyCredentialsTests, self).setUp()
44
45         self.server      = os.environ["SERVER"]
46         self.domain      = os.environ["DOMAIN"]
47         self.host        = os.environ["SERVER_IP"]
48         self.lp          = self.get_loadparm()
49
50         self.credentials = self.get_credentials()
51
52         self.session     = system_session()
53         self.ldb = SamDB(url="ldap://%s" % self.host,
54                          session_info=self.session,
55                          credentials=self.credentials,
56                          lp=self.lp)
57
58         self.create_machine_account()
59         self.create_user_account()
60
61
62     def tearDown(self):
63         super(PyCredentialsTests, self).tearDown()
64         delete_force(self.ldb, self.machine_dn)
65         delete_force(self.ldb, self.user_dn)
66
67     # Until a successful netlogon connection has been established there will
68     # not be a valid authenticator associated with the credentials
69     # and new_client_authenticator should throw a ValueError
70     def test_no_netlogon_connection(self):
71         self.assertRaises(ValueError,
72                           self.machine_creds.new_client_authenticator)
73
74     # Once a netlogon connection has been established,
75     # new_client_authenticator should return a value
76     #
77     def test_have_netlogon_connection(self):
78         c = self.get_netlogon_connection()
79         a = self.machine_creds.new_client_authenticator()
80         self.assertIsNotNone(a)
81
82     # Get an authenticator and use it on a sequence of operations requiring
83     # an authenticator
84     def test_client_authenticator(self):
85         c = self.get_netlogon_connection()
86         (authenticator, subsequent) = self.get_authenticator(c)
87         self.do_NetrLogonSamLogonWithFlags(c, authenticator, subsequent)
88         (authenticator, subsequent) = self.get_authenticator(c)
89         self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
90         (authenticator, subsequent) = self.get_authenticator(c)
91         self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
92         (authenticator, subsequent) = self.get_authenticator(c)
93         self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
94
95     # Test Credentials.encrypt_netr_crypt_password
96     # By performing a NetrServerPasswordSet2
97     # And the logging on using the new password.
98     def test_encrypt_netr_password(self):
99         # Change the password
100         self.do_Netr_ServerPasswordSet2()
101         # Now use the new password to perform an operation
102         self.do_DsrEnumerateDomainTrusts()
103
104
105    # Change the current machine account pazssword with a
106    # netr_ServerPasswordSet2 call.
107
108     def do_Netr_ServerPasswordSet2(self):
109         c = self.get_netlogon_connection()
110         (authenticator, subsequent) = self.get_authenticator(c)
111         PWD_LEN  = 32
112         DATA_LEN = 512
113         newpass = samba.generate_random_password(PWD_LEN, PWD_LEN)
114         filler  = [ord(x) for x in os.urandom(DATA_LEN-PWD_LEN)]
115         pwd = netlogon.netr_CryptPassword()
116         pwd.length = PWD_LEN
117         pwd.data = filler + [ord(x) for x in newpass]
118         self.machine_creds.encrypt_netr_crypt_password(pwd)
119         c.netr_ServerPasswordSet2(self.server,
120                                   self.machine_creds.get_workstation(),
121                                   SEC_CHAN_WKSTA,
122                                   self.machine_name,
123                                   authenticator,
124                                   pwd)
125
126         self.machine_pass = newpass
127         self.machine_creds.set_password(newpass)
128
129     # Perform a DsrEnumerateDomainTrusts, this provides confirmation that
130     # a netlogon connection has been correctly established
131     def do_DsrEnumerateDomainTrusts(self):
132         c = self.get_netlogon_connection()
133         trusts = c.netr_DsrEnumerateDomainTrusts(
134             self.server,
135             netlogon.NETR_TRUST_FLAG_IN_FOREST |
136             netlogon.NETR_TRUST_FLAG_OUTBOUND  |
137             netlogon.NETR_TRUST_FLAG_INBOUND)
138
139     # Establish sealed schannel netlogon connection over TCP/IP
140     #
141     def get_netlogon_connection(self):
142         return netlogon.netlogon("ncacn_ip_tcp:%s[schannel,seal]" % self.server,
143                                  self.lp,
144                                  self.machine_creds)
145
146     #
147     # Create the machine account
148     def create_machine_account(self):
149         self.machine_pass = samba.generate_random_password(32, 32)
150         self.machine_name = MACHINE_NAME
151         self.machine_dn = "cn=%s,%s" % (self.machine_name, self.ldb.domain_dn())
152
153         # remove the account if it exists, this will happen if a previous test
154         # run failed
155         delete_force(self.ldb, self.machine_dn)
156
157         utf16pw = unicode(
158             '"' + self.machine_pass.encode('utf-8') + '"', 'utf-8'
159         ).encode('utf-16-le')
160         self.ldb.add({
161             "dn": self.machine_dn,
162             "objectclass": "computer",
163             "sAMAccountName": "%s$" % self.machine_name,
164             "userAccountControl":
165                 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
166             "unicodePwd": utf16pw})
167
168         self.machine_creds = Credentials()
169         self.machine_creds.guess(self.get_loadparm())
170         self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
171         self.machine_creds.set_password(self.machine_pass)
172         self.machine_creds.set_username(self.machine_name + "$")
173         self.machine_creds.set_workstation(self.machine_name)
174
175     #
176     # Create a test user account
177     def create_user_account(self):
178         self.user_pass = samba.generate_random_password(32, 32)
179         self.user_name = USER_NAME
180         self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
181
182         # remove the account if it exists, this will happen if a previous test
183         # run failed
184         delete_force(self.ldb, self.user_dn)
185
186         utf16pw = unicode(
187             '"' + self.user_pass.encode('utf-8') + '"', 'utf-8'
188         ).encode('utf-16-le')
189         self.ldb.add({
190             "dn": self.user_dn,
191             "objectclass": "user",
192             "sAMAccountName": "%s" % self.user_name,
193             "userAccountControl": str(UF_NORMAL_ACCOUNT),
194             "unicodePwd": utf16pw})
195
196         self.user_creds = Credentials()
197         self.user_creds.guess(self.get_loadparm())
198         self.user_creds.set_password(self.user_pass)
199         self.user_creds.set_username(self.user_name)
200         self.user_creds.set_workstation(self.machine_name)
201         pass
202
203     #
204     # Get the authenticator from the machine creds.
205     def get_authenticator(self, c):
206         auth = self.machine_creds.new_client_authenticator();
207         current  = netr_Authenticator()
208         current.cred.data = [ord(x) for x in auth["credential"]]
209         current.timestamp = auth["timestamp"]
210
211         subsequent = netr_Authenticator()
212         return (current, subsequent)
213
214     def do_NetrLogonSamLogonWithFlags(self, c, current, subsequent):
215         logon = samlogon_logon_info(self.domain,
216                                     self.machine_name,
217                                     self.user_creds)
218
219         logon_level = netlogon.NetlogonNetworkTransitiveInformation
220         validation_level = netlogon.NetlogonValidationSamInfo4
221         netr_flags = 0
222         c.netr_LogonSamLogonWithFlags(self.server,
223                                       self.user_creds.get_workstation(),
224                                       current,
225                                       subsequent,
226                                       logon_level,
227                                       logon,
228                                       validation_level,
229                                       netr_flags)
230
231     def do_NetrLogonGetDomainInfo(self, c, current, subsequent):
232         query = netr_WorkstationInformation()
233
234         c.netr_LogonGetDomainInfo(self.server,
235                                   self.user_creds.get_workstation(),
236                                   current,
237                                   subsequent,
238                                   2,
239                                   query)
240
241 #
242 # Build the logon data required by NetrLogonSamLogonWithFlags
243 def samlogon_logon_info(domain_name, computer_name, creds):
244
245     target_info_blob = samlogon_target(domain_name, computer_name)
246
247     challenge = b"abcdefgh"
248     # User account under test
249     response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
250                                        challenge=challenge,
251                                        target_info=target_info_blob)
252
253     logon = netlogon.netr_NetworkInfo()
254
255     logon.challenge     = [ord(x) for x in challenge]
256     logon.nt            = netlogon.netr_ChallengeResponse()
257     logon.nt.length     = len(response["nt_response"])
258     logon.nt.data       = [ord(x) for x in response["nt_response"]]
259     logon.identity_info = netlogon.netr_IdentityInfo()
260
261     (username, domain)  = creds.get_ntlm_username_domain()
262     logon.identity_info.domain_name.string  = domain
263     logon.identity_info.account_name.string = username
264     logon.identity_info.workstation.string  = creds.get_workstation()
265
266     return logon
267
268 #
269 # Build the samlogon target info.
270 def samlogon_target(domain_name, computer_name):
271     target_info = ntlmssp.AV_PAIR_LIST()
272     target_info.count = 3
273     computername = ntlmssp.AV_PAIR()
274     computername.AvId = ntlmssp.MsvAvNbComputerName
275     computername.Value = computer_name
276
277     domainname = ntlmssp.AV_PAIR()
278     domainname.AvId = ntlmssp.MsvAvNbDomainName
279     domainname.Value = domain_name
280
281     eol = ntlmssp.AV_PAIR()
282     eol.AvId = ntlmssp.MsvAvEOL
283     target_info.pair = [domainname, computername, eol]
284
285     return ndr_pack(target_info)