auth logging tests: Clean up flake8 warnings
[gd/samba-autobuild/.git] / python / samba / tests / auth_log_pass_change.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging of password changes.
20 """
21
22 import samba.tests
23 from samba.samdb import SamDB
24 from samba.auth import system_session
25 import os
26 import samba.tests.auth_log_base
27 from samba.tests import delete_force
28 from samba.net import Net
29 import samba
30 from subprocess import call
31 from ldb import LdbError
32
33 USER_NAME = "authlogtestuser"
34 USER_PASS = samba.generate_random_password(32, 32)
35
36
37 class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
38
39     def setUp(self):
40         super(AuthLogPassChangeTests, self).setUp()
41
42         self.remoteAddress = os.environ["CLIENT_IP"]
43         self.server_ip = os.environ["SERVER_IP"]
44
45         host = "ldap://%s" % os.environ["SERVER"]
46         self.ldb = SamDB(url=host,
47                          session_info=system_session(),
48                          credentials=self.get_credentials(),
49                          lp=self.get_loadparm())
50
51         print("ldb %s" % type(self.ldb))
52         # Gets back the basedn
53         base_dn = self.ldb.domain_dn()
54         print("base_dn %s" % base_dn)
55
56         # Get the old "dSHeuristics" if it was set
57         dsheuristics = self.ldb.get_dsheuristics()
58
59         # Set the "dSHeuristics" to activate the correct "userPassword"
60         # behaviour
61         self.ldb.set_dsheuristics("000000001")
62
63         # Reset the "dSHeuristics" as they were before
64         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
65
66         # Get the old "minPwdAge"
67         minPwdAge = self.ldb.get_minPwdAge()
68
69         # Set it temporarily to "0"
70         self.ldb.set_minPwdAge("0")
71         self.base_dn = self.ldb.domain_dn()
72
73         # Reset the "minPwdAge" as it was before
74         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
75
76         # (Re)adds the test user USER_NAME with password USER_PASS
77         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
78         self.ldb.add({
79             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
80             "objectclass": "user",
81             "sAMAccountName": USER_NAME,
82             "userPassword": USER_PASS
83         })
84
85         # discard any auth log messages for the password setup
86         self.discardMessages()
87
88     def tearDown(self):
89         super(AuthLogPassChangeTests, self).tearDown()
90
91     def test_admin_change_password(self):
92         def isLastExpectedMessage(msg):
93             return ((msg["type"] == "Authentication") and
94                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
95                     (msg["Authentication"]["serviceDescription"] ==
96                         "SAMR Password Change") and
97                     (msg["Authentication"]["authDescription"] ==
98                         "samr_ChangePasswordUser3"))
99
100         creds = self.insta_creds(template=self.get_credentials())
101
102         lp = self.get_loadparm()
103         net = Net(creds, lp, server=self.server_ip)
104         password = "newPassword!!42"
105
106         net.change_password(newpassword=password.encode('utf-8'),
107                             username=USER_NAME,
108                             oldpassword=USER_PASS)
109
110         messages = self.waitForMessages(isLastExpectedMessage)
111         print("Received %d messages" % len(messages))
112         self.assertEquals(8,
113                           len(messages),
114                           "Did not receive the expected number of messages")
115
116     def test_admin_change_password_new_password_fails_restriction(self):
117         def isLastExpectedMessage(msg):
118             return ((msg["type"] == "Authentication") and
119                     (msg["Authentication"]["status"] ==
120                         "NT_STATUS_PASSWORD_RESTRICTION") and
121                     (msg["Authentication"]["serviceDescription"] ==
122                         "SAMR Password Change") and
123                     (msg["Authentication"]["authDescription"] ==
124                         "samr_ChangePasswordUser3"))
125
126         creds = self.insta_creds(template=self.get_credentials())
127
128         lp = self.get_loadparm()
129         net = Net(creds, lp, server=self.server_ip)
130         password = "newPassword"
131
132         exception_thrown = False
133         try:
134             net.change_password(newpassword=password.encode('utf-8'),
135                                 oldpassword=USER_PASS,
136                                 username=USER_NAME)
137         except Exception:
138             exception_thrown = True
139         self.assertEquals(True, exception_thrown,
140                           "Expected exception not thrown")
141
142         messages = self.waitForMessages(isLastExpectedMessage)
143         self.assertEquals(8,
144                           len(messages),
145                           "Did not receive the expected number of messages")
146
147     def test_admin_change_password_unknown_user(self):
148         def isLastExpectedMessage(msg):
149             return ((msg["type"] == "Authentication") and
150                     (msg["Authentication"]["status"] ==
151                         "NT_STATUS_NO_SUCH_USER") and
152                     (msg["Authentication"]["serviceDescription"] ==
153                         "SAMR Password Change") and
154                     (msg["Authentication"]["authDescription"] ==
155                         "samr_ChangePasswordUser3"))
156
157         creds = self.insta_creds(template=self.get_credentials())
158
159         lp = self.get_loadparm()
160         net = Net(creds, lp, server=self.server_ip)
161         password = "newPassword!!42"
162
163         exception_thrown = False
164         try:
165             net.change_password(newpassword=password.encode('utf-8'),
166                                 oldpassword=USER_PASS,
167                                 username="badUser")
168         except Exception:
169             exception_thrown = True
170         self.assertEquals(True, exception_thrown,
171                           "Expected exception not thrown")
172
173         messages = self.waitForMessages(isLastExpectedMessage)
174         self.assertEquals(8,
175                           len(messages),
176                           "Did not receive the expected number of messages")
177
178     def test_admin_change_password_bad_original_password(self):
179         def isLastExpectedMessage(msg):
180             return ((msg["type"] == "Authentication") and
181                     (msg["Authentication"]["status"] ==
182                         "NT_STATUS_WRONG_PASSWORD") and
183                     (msg["Authentication"]["serviceDescription"] ==
184                         "SAMR Password Change") and
185                     (msg["Authentication"]["authDescription"] ==
186                         "samr_ChangePasswordUser3"))
187
188         creds = self.insta_creds(template=self.get_credentials())
189
190         lp = self.get_loadparm()
191         net = Net(creds, lp, server=self.server_ip)
192         password = "newPassword!!42"
193
194         exception_thrown = False
195         try:
196             net.change_password(newpassword=password.encode('utf-8'),
197                                 oldpassword="badPassword",
198                                 username=USER_NAME)
199         except Exception:
200             exception_thrown = True
201         self.assertEquals(True, exception_thrown,
202                           "Expected exception not thrown")
203
204         messages = self.waitForMessages(isLastExpectedMessage)
205         self.assertEquals(8,
206                           len(messages),
207                           "Did not receive the expected number of messages")
208
209     # net rap password changes are broken, but they trigger enough of the
210     # server side behaviour to exercise the code paths of interest.
211     # if we used the real password it would be too long and does not hash
212     # correctly, so we just check it triggers the wrong password path.
213     def test_rap_change_password(self):
214         def isLastExpectedMessage(msg):
215             return ((msg["type"] == "Authentication") and
216                     (msg["Authentication"]["serviceDescription"] ==
217                         "SAMR Password Change") and
218                     (msg["Authentication"]["status"] ==
219                         "NT_STATUS_WRONG_PASSWORD") and
220                     (msg["Authentication"]["authDescription"] ==
221                         "OemChangePasswordUser2"))
222
223         username = os.environ["USERNAME"]
224         server = os.environ["SERVER"]
225         password = os.environ["PASSWORD"]
226         server_param = "--server=%s" % server
227         creds = "-U%s%%%s" % (username, password)
228         call(["bin/net", "rap", server_param,
229               "password", USER_NAME, "notMyPassword", "notGoingToBeMyPassword",
230               server, creds, "--option=client ipc max protocol=nt1"])
231
232         messages = self.waitForMessages(isLastExpectedMessage)
233         self.assertEquals(7,
234                           len(messages),
235                           "Did not receive the expected number of messages")
236
237     def test_ldap_change_password(self):
238         def isLastExpectedMessage(msg):
239             return ((msg["type"] == "Authentication") and
240                     (msg["Authentication"]["status"] == "NT_STATUS_OK") and
241                     (msg["Authentication"]["serviceDescription"] ==
242                         "LDAP Password Change") and
243                     (msg["Authentication"]["authDescription"] ==
244                         "LDAP Modify"))
245
246         new_password = samba.generate_random_password(32, 32)
247         self.ldb.modify_ldif(
248             "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
249             "changetype: modify\n" +
250             "delete: userPassword\n" +
251             "userPassword: " + USER_PASS + "\n" +
252             "add: userPassword\n" +
253             "userPassword: " + new_password + "\n")
254
255         messages = self.waitForMessages(isLastExpectedMessage)
256         print("Received %d messages" % len(messages))
257         self.assertEquals(4,
258                           len(messages),
259                           "Did not receive the expected number of messages")
260
261     #
262     # Currently this does not get logged, so we expect to only see the log
263     # entries for the underlying ldap bind.
264     #
265     def test_ldap_change_password_bad_user(self):
266         def isLastExpectedMessage(msg):
267             return (msg["type"] == "Authorization" and
268                     msg["Authorization"]["serviceDescription"] == "LDAP" and
269                     msg["Authorization"]["authType"] == "krb5")
270
271         new_password = samba.generate_random_password(32, 32)
272         try:
273             self.ldb.modify_ldif(
274                 "dn: cn=" + "badUser" + ",cn=users," + self.base_dn + "\n" +
275                 "changetype: modify\n" +
276                 "delete: userPassword\n" +
277                 "userPassword: " + USER_PASS + "\n" +
278                 "add: userPassword\n" +
279                 "userPassword: " + new_password + "\n")
280             self.fail()
281         except LdbError as e:
282             (num, msg) = e.args
283             pass
284
285         messages = self.waitForMessages(isLastExpectedMessage)
286         print("Received %d messages" % len(messages))
287         self.assertEquals(3,
288                           len(messages),
289                           "Did not receive the expected number of messages")
290
291     def test_ldap_change_password_bad_original_password(self):
292         def isLastExpectedMessage(msg):
293             return ((msg["type"] == "Authentication") and
294                     (msg["Authentication"]["status"] ==
295                         "NT_STATUS_WRONG_PASSWORD") and
296                     (msg["Authentication"]["serviceDescription"] ==
297                         "LDAP Password Change") and
298                     (msg["Authentication"]["authDescription"] ==
299                         "LDAP Modify"))
300
301         new_password = samba.generate_random_password(32, 32)
302         try:
303             self.ldb.modify_ldif(
304                 "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
305                 "changetype: modify\n" +
306                 "delete: userPassword\n" +
307                 "userPassword: " + "badPassword" + "\n" +
308                 "add: userPassword\n" +
309                 "userPassword: " + new_password + "\n")
310             self.fail()
311         except LdbError as e1:
312             (num, msg) = e1.args
313             pass
314
315         messages = self.waitForMessages(isLastExpectedMessage)
316         print("Received %d messages" % len(messages))
317         self.assertEquals(4,
318                           len(messages),
319                           "Did not receive the expected number of messages")