samba python tests: convert print func to be py2/py3 compatible
[kai/samba-autobuild/.git] / python / samba / tests / auth_log_pass_change.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging of password changes.
20 """
21
22 from samba import auth
23 import samba.tests
24 from samba.messaging import Messaging
25 from samba.samdb import SamDB
26 from samba.auth import system_session
27 import json
28 import os
29 import samba.tests.auth_log_base
30 from samba.tests import delete_force
31 from samba.net import Net
32 from samba import ntstatus
33 import samba
34 from subprocess import call
35 from ldb import LdbError
36
37 USER_NAME = "authlogtestuser"
38 USER_PASS = samba.generate_random_password(32,32)
39
40 class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
41
42     def setUp(self):
43         super(AuthLogPassChangeTests, self).setUp()
44
45         self.remoteAddress = os.environ["CLIENT_IP"]
46         self.server_ip = os.environ["SERVER_IP"]
47
48         host = "ldap://%s" % os.environ["SERVER"]
49         self.ldb = SamDB(url=host,
50                          session_info=system_session(),
51                          credentials=self.get_credentials(),
52                          lp=self.get_loadparm())
53
54         print("ldb %s" % type(self.ldb))
55         # Gets back the basedn
56         base_dn = self.ldb.domain_dn()
57         print("base_dn %s" % base_dn)
58
59         # Gets back the configuration basedn
60         configuration_dn = self.ldb.get_config_basedn().get_linearized()
61
62         # Get the old "dSHeuristics" if it was set
63         dsheuristics = self.ldb.get_dsheuristics()
64
65         # Set the "dSHeuristics" to activate the correct "userPassword"
66         # behaviour
67         self.ldb.set_dsheuristics("000000001")
68
69         # Reset the "dSHeuristics" as they were before
70         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
71
72         # Get the old "minPwdAge"
73         minPwdAge = self.ldb.get_minPwdAge()
74
75         # Set it temporarily to "0"
76         self.ldb.set_minPwdAge("0")
77         self.base_dn = self.ldb.domain_dn()
78
79         # Reset the "minPwdAge" as it was before
80         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
81
82         # (Re)adds the test user USER_NAME with password USER_PASS
83         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
84         self.ldb.add({
85              "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
86              "objectclass": "user",
87              "sAMAccountName": USER_NAME,
88              "userPassword": USER_PASS
89         })
90
91         # discard any auth log messages for the password setup
92         self.discardMessages()
93
94     def tearDown(self):
95         super(AuthLogPassChangeTests, self).tearDown()
96
97
98     def test_admin_change_password(self):
99         def isLastExpectedMessage(msg):
100             return (msg["type"] == "Authentication" and
101                     msg["Authentication"]["status"]
102                         == "NT_STATUS_OK" and
103                     msg["Authentication"]["serviceDescription"]
104                         == "SAMR Password Change" and
105                     msg["Authentication"]["authDescription"]
106                         == "samr_ChangePasswordUser3")
107
108         creds = self.insta_creds(template = self.get_credentials())
109
110         lp = self.get_loadparm()
111         net = Net(creds, lp, server=self.server_ip)
112         password = "newPassword!!42"
113
114         net.change_password(newpassword=password.encode('utf-8'),
115                             username=USER_NAME,
116                             oldpassword=USER_PASS)
117
118
119         messages = self.waitForMessages(isLastExpectedMessage)
120         print("Received %d messages" % len(messages))
121         self.assertEquals(8,
122                           len(messages),
123                           "Did not receive the expected number of messages")
124
125     def test_admin_change_password_new_password_fails_restriction(self):
126         def isLastExpectedMessage(msg):
127             return (msg["type"] == "Authentication" and
128                     msg["Authentication"]["status"]
129                         == "NT_STATUS_PASSWORD_RESTRICTION" and
130                     msg["Authentication"]["serviceDescription"]
131                         == "SAMR Password Change" and
132                     msg["Authentication"]["authDescription"]
133                         == "samr_ChangePasswordUser3")
134
135         creds = self.insta_creds(template=self.get_credentials())
136
137         lp = self.get_loadparm()
138         net = Net(creds, lp, server=self.server_ip)
139         password = "newPassword"
140
141         exception_thrown = False
142         try:
143             net.change_password(newpassword=password.encode('utf-8'),
144                                 oldpassword=USER_PASS,
145                                 username=USER_NAME)
146         except Exception as msg:
147             exception_thrown = True
148         self.assertEquals(True, exception_thrown,
149                           "Expected exception not thrown")
150
151         messages = self.waitForMessages(isLastExpectedMessage)
152         self.assertEquals(8,
153                           len(messages),
154                           "Did not receive the expected number of messages")
155
156     def test_admin_change_password_unknown_user(self):
157         def isLastExpectedMessage(msg):
158             return (msg["type"] == "Authentication" and
159                     msg["Authentication"]["status"]
160                         == "NT_STATUS_NO_SUCH_USER" and
161                     msg["Authentication"]["serviceDescription"]
162                         == "SAMR Password Change" and
163                     msg["Authentication"]["authDescription"]
164                         == "samr_ChangePasswordUser3")
165
166         creds = self.insta_creds(template=self.get_credentials())
167
168         lp = self.get_loadparm()
169         net = Net(creds, lp, server=self.server_ip)
170         password = "newPassword!!42"
171
172         exception_thrown = False
173         try:
174             net.change_password(newpassword=password.encode('utf-8'),
175                                 oldpassword=USER_PASS,
176                                 username="badUser")
177         except Exception as msg:
178             exception_thrown = True
179         self.assertEquals(True, exception_thrown,
180                           "Expected exception not thrown")
181
182         messages = self.waitForMessages(isLastExpectedMessage)
183         self.assertEquals(8,
184                           len(messages),
185                           "Did not receive the expected number of messages")
186
187     def test_admin_change_password_bad_original_password(self):
188         def isLastExpectedMessage(msg):
189             return (msg["type"] == "Authentication" and
190                     msg["Authentication"]["status"]
191                         == "NT_STATUS_WRONG_PASSWORD" and
192                     msg["Authentication"]["serviceDescription"]
193                         == "SAMR Password Change" and
194                     msg["Authentication"]["authDescription"]
195                         == "samr_ChangePasswordUser3")
196
197         creds = self.insta_creds(template=self.get_credentials())
198
199         lp = self.get_loadparm()
200         net = Net(creds, lp, server=self.server_ip)
201         password = "newPassword!!42"
202
203         exception_thrown = False
204         try:
205             net.change_password(newpassword=password.encode('utf-8'),
206                                 oldpassword="badPassword",
207                                 username=USER_NAME)
208         except Exception as msg:
209             exception_thrown = True
210         self.assertEquals(True, exception_thrown,
211                           "Expected exception not thrown")
212
213         messages = self.waitForMessages(isLastExpectedMessage)
214         self.assertEquals(8,
215                           len(messages),
216                           "Did not receive the expected number of messages")
217
218     # net rap password changes are broken, but they trigger enough of the
219     # server side behaviour to exercise the code paths of interest.
220     # if we used the real password it would be too long and does not hash
221     # correctly, so we just check it triggers the wrong password path.
222     def test_rap_change_password(self):
223         def isLastExpectedMessage(msg):
224             return (msg["type"] == "Authentication" and
225                     msg["Authentication"]["serviceDescription"]
226                         == "SAMR Password Change" and
227                     msg["Authentication"]["status"]
228                         == "NT_STATUS_WRONG_PASSWORD" and
229                     msg["Authentication"]["authDescription"]
230                         == "OemChangePasswordUser2")
231
232         username = os.environ["USERNAME"]
233         server = os.environ["SERVER"]
234         password = os.environ["PASSWORD"]
235         server_param = "--server=%s" % server
236         creds = "-U%s%%%s" % (username,password)
237         call(["bin/net", "rap", server_param,
238               "password", USER_NAME, "notMyPassword", "notGoingToBeMyPassword",
239               server, creds, "--option=client ipc max protocol=nt1"])
240
241         messages = self.waitForMessages(isLastExpectedMessage)
242         self.assertEquals(7,
243                           len(messages),
244                           "Did not receive the expected number of messages")
245
246     def test_ldap_change_password(self):
247         def isLastExpectedMessage(msg):
248             return (msg["type"] == "Authentication" and
249                     msg["Authentication"]["status"]
250                         == "NT_STATUS_OK" and
251                     msg["Authentication"]["serviceDescription"]
252                         == "LDAP Password Change" and
253                     msg["Authentication"]["authDescription"]
254                         == "LDAP Modify")
255
256         new_password = samba.generate_random_password(32,32)
257         self.ldb.modify_ldif(
258             "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
259             "changetype: modify\n" +
260             "delete: userPassword\n" +
261             "userPassword: " + USER_PASS + "\n" +
262             "add: userPassword\n" +
263             "userPassword: " + new_password + "\n"
264         )
265
266         messages = self.waitForMessages(isLastExpectedMessage)
267         print("Received %d messages" % len(messages))
268         self.assertEquals(4,
269                           len(messages),
270                           "Did not receive the expected number of messages")
271
272     #
273     # Currently this does not get logged, so we expect to only see the log
274     # entries for the underlying ldap bind.
275     #
276     def test_ldap_change_password_bad_user(self):
277         def isLastExpectedMessage(msg):
278             return (msg["type"] == "Authorization" and
279                     msg["Authorization"]["serviceDescription"]
280                         == "LDAP" and
281                     msg["Authorization"]["authType"] == "krb5")
282
283         new_password = samba.generate_random_password(32,32)
284         try:
285             self.ldb.modify_ldif(
286                 "dn: cn=" + "badUser" + ",cn=users," + self.base_dn + "\n" +
287                 "changetype: modify\n" +
288                 "delete: userPassword\n" +
289                 "userPassword: " + USER_PASS + "\n" +
290                 "add: userPassword\n" +
291                 "userPassword: " + new_password + "\n"
292             )
293             self.fail()
294         except LdbError as e:
295             (num, msg) = e.args
296             pass
297
298         messages = self.waitForMessages(isLastExpectedMessage)
299         print("Received %d messages" % len(messages))
300         self.assertEquals(3,
301                           len(messages),
302                           "Did not receive the expected number of messages")
303
304     def test_ldap_change_password_bad_original_password(self):
305         def isLastExpectedMessage(msg):
306             return (msg["type"] == "Authentication" and
307                     msg["Authentication"]["status"]
308                         == "NT_STATUS_WRONG_PASSWORD" and
309                     msg["Authentication"]["serviceDescription"]
310                         == "LDAP Password Change" and
311                     msg["Authentication"]["authDescription"]
312                         == "LDAP Modify")
313
314         new_password = samba.generate_random_password(32,32)
315         try:
316             self.ldb.modify_ldif(
317                 "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
318                 "changetype: modify\n" +
319                 "delete: userPassword\n" +
320                 "userPassword: " + "badPassword" + "\n" +
321                 "add: userPassword\n" +
322                 "userPassword: " + new_password + "\n"
323             )
324             self.fail()
325         except LdbError as e1:
326             (num, msg) = e1.args
327             pass
328
329         messages = self.waitForMessages(isLastExpectedMessage)
330         print("Received %d messages" % len(messages))
331         self.assertEquals(4,
332                           len(messages),
333                           "Did not receive the expected number of messages")