dsdb: Add authentication audit logging for LDAP password change
[samba.git] / python / samba / tests / auth_log_pass_change.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for the Auth and AuthZ logging of password changes.
19 """
20
21 from samba import auth
22 import samba.tests
23 from samba.messaging import Messaging
24 from samba.samdb import SamDB
25 from samba.auth import system_session
26 import json
27 import os
28 import samba.tests.auth_log_base
29 from samba.tests import delete_force
30 from samba.net import Net
31 from samba import ntstatus
32 import samba
33 from subprocess import call
34 from ldb import LdbError
35
36 USER_NAME = "authlogtestuser"
37 USER_PASS = samba.generate_random_password(32,32)
38
39 class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
40
41     def setUp(self):
42         super(AuthLogPassChangeTests, self).setUp()
43
44         self.remoteAddress = os.environ["CLIENT_IP"]
45         self.server_ip = os.environ["SERVER_IP"]
46
47         host = "ldap://%s" % os.environ["SERVER"]
48         self.ldb = SamDB(url=host,
49                          session_info=system_session(),
50                          credentials=self.get_credentials(),
51                          lp=self.get_loadparm())
52
53         print "ldb %s" % type(self.ldb)
54         # Gets back the basedn
55         base_dn = self.ldb.domain_dn()
56         print "base_dn %s" % base_dn
57
58         # Gets back the configuration basedn
59         configuration_dn = self.ldb.get_config_basedn().get_linearized()
60
61         # Get the old "dSHeuristics" if it was set
62         dsheuristics = self.ldb.get_dsheuristics()
63
64         # Set the "dSHeuristics" to activate the correct "userPassword"
65         # behaviour
66         self.ldb.set_dsheuristics("000000001")
67
68         # Reset the "dSHeuristics" as they were before
69         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
70
71         # Get the old "minPwdAge"
72         minPwdAge = self.ldb.get_minPwdAge()
73
74         # Set it temporarily to "0"
75         self.ldb.set_minPwdAge("0")
76         self.base_dn = self.ldb.domain_dn()
77
78         # Reset the "minPwdAge" as it was before
79         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
80
81         # (Re)adds the test user USER_NAME with password USER_PASS
82         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
83         self.ldb.add({
84              "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
85              "objectclass": "user",
86              "sAMAccountName": USER_NAME,
87              "userPassword": USER_PASS})
88
89         # discard any auth log messages for the password setup
90         self.discardMessages()
91
92     def tearDown(self):
93         super(AuthLogPassChangeTests, self).tearDown()
94
95
96     def test_admin_change_password(self):
97         def isLastExpectedMessage( msg):
98             return (msg["type"] == "Authentication" and
99                     msg["Authentication"]["status"]
100                         == "NT_STATUS_OK" and
101                     msg["Authentication"]["serviceDescription"]
102                         == "SAMR Password Change" and
103                     msg["Authentication"]["authDescription"]
104                         == "samr_ChangePasswordUser3")
105
106         creds = self.insta_creds(template = self.get_credentials())
107
108         lp = self.get_loadparm()
109         net = Net(creds, lp, server=self.server_ip)
110         password = "newPassword!!42"
111
112         net.change_password(newpassword = password.encode('utf-8'),
113                             username    = USER_NAME,
114                             oldpassword = USER_PASS)
115
116
117         messages = self.waitForMessages( isLastExpectedMessage)
118         print "Received %d messages" % len(messages)
119         self.assertEquals(8,
120                           len(messages),
121                           "Did not receive the expected number of messages")
122
123     def test_admin_change_password_new_password_fails_restriction(self):
124         def isLastExpectedMessage( msg):
125             return (msg["type"] == "Authentication" and
126                     msg["Authentication"]["status"]
127                         == "NT_STATUS_PASSWORD_RESTRICTION" and
128                     msg["Authentication"]["serviceDescription"]
129                         == "SAMR Password Change" and
130                     msg["Authentication"]["authDescription"]
131                         == "samr_ChangePasswordUser3")
132
133         creds = self.insta_creds(template=self.get_credentials())
134
135         lp = self.get_loadparm()
136         net = Net(creds, lp, server=self.server_ip)
137         password = "newPassword"
138
139         exception_thrown = False
140         try:
141             net.change_password(newpassword = password.encode('utf-8'),
142                                 oldpassword = USER_PASS,
143                                 username = USER_NAME)
144         except Exception, msg:
145             exception_thrown = True
146         self.assertEquals(True, exception_thrown,
147                           "Expected exception not thrown")
148
149         messages = self.waitForMessages( isLastExpectedMessage)
150         self.assertEquals(8,
151                           len(messages),
152                           "Did not receive the expected number of messages")
153
154     def test_admin_change_password_unknown_user(self):
155         def isLastExpectedMessage( msg):
156             return (msg["type"] == "Authentication" and
157                     msg["Authentication"]["status"]
158                         == "NT_STATUS_NO_SUCH_USER" and
159                     msg["Authentication"]["serviceDescription"]
160                         == "SAMR Password Change" and
161                     msg["Authentication"]["authDescription"]
162                         == "samr_ChangePasswordUser3")
163
164         creds = self.insta_creds(template=self.get_credentials())
165
166         lp = self.get_loadparm()
167         net = Net(creds, lp, server=self.server_ip)
168         password = "newPassword!!42"
169
170         exception_thrown = False
171         try:
172             net.change_password(newpassword = password.encode('utf-8'),
173                                 oldpassword = USER_PASS,
174                                 username    = "badUser")
175         except Exception, msg:
176             exception_thrown = True
177         self.assertEquals(True, exception_thrown,
178                           "Expected exception not thrown")
179
180         messages = self.waitForMessages( isLastExpectedMessage)
181         self.assertEquals(8,
182                           len(messages),
183                           "Did not receive the expected number of messages")
184
185     def test_admin_change_password_bad_original_password(self):
186         def isLastExpectedMessage( msg):
187             return (msg["type"] == "Authentication" and
188                     msg["Authentication"]["status"]
189                         == "NT_STATUS_WRONG_PASSWORD" and
190                     msg["Authentication"]["serviceDescription"]
191                         == "SAMR Password Change" and
192                     msg["Authentication"]["authDescription"]
193                         == "samr_ChangePasswordUser3")
194
195         creds = self.insta_creds(template=self.get_credentials())
196
197         lp = self.get_loadparm()
198         net = Net(creds, lp, server=self.server_ip)
199         password = "newPassword!!42"
200
201         exception_thrown = False
202         try:
203             net.change_password(newpassword = password.encode('utf-8'),
204                                 oldpassword = "badPassword",
205                                 username    = USER_NAME)
206         except Exception, msg:
207             exception_thrown = True
208         self.assertEquals(True, exception_thrown,
209                           "Expected exception not thrown")
210
211         messages = self.waitForMessages( isLastExpectedMessage)
212         self.assertEquals(8,
213                           len(messages),
214                           "Did not receive the expected number of messages")
215
216     # net rap password changes are broken, but they trigger enough of the
217     # server side behaviour to exercise the code paths of interest.
218     # if we used the real password it would be too long and does not hash
219     # correctly, so we just check it triggers the wrong password path.
220     def test_rap_change_password(self):
221         def isLastExpectedMessage( msg):
222             return (msg["type"] == "Authentication" and
223                     msg["Authentication"]["serviceDescription"]
224                         == "SAMR Password Change" and
225                     msg["Authentication"]["status"]
226                         == "NT_STATUS_WRONG_PASSWORD" and
227                     msg["Authentication"]["authDescription"]
228                         == "OemChangePasswordUser2")
229
230         username     = os.environ["USERNAME"]
231         server       = os.environ["SERVER"]
232         password     = os.environ["PASSWORD"]
233         server_param = "--server=%s" % server
234         creds        = "-U%s%%%s" % (username,password)
235         call(["bin/net", "rap", server_param,
236               "password", USER_NAME, "notMyPassword", "notGoingToBeMyPassword",
237               server, creds, "--option=client ipc max protocol=nt1"])
238
239         messages = self.waitForMessages( isLastExpectedMessage)
240         self.assertEquals(7,
241                           len(messages),
242                           "Did not receive the expected number of messages")
243
244     def test_ldap_change_password(self):
245         def isLastExpectedMessage( msg):
246             return (msg["type"] == "Authentication" and
247                     msg["Authentication"]["status"]
248                         == "NT_STATUS_OK" and
249                     msg["Authentication"]["serviceDescription"]
250                         == "LDAP Password Change" and
251                     msg["Authentication"]["authDescription"]
252                         == "LDAP Modify")
253
254         new_password = samba.generate_random_password(32,32)
255         self.ldb.modify_ldif(
256             "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
257             "changetype: modify\n" +
258             "delete: userPassword\n" +
259             "userPassword: " + USER_PASS + "\n" +
260             "add: userPassword\n" +
261             "userPassword: " + new_password + "\n"
262         )
263
264         messages = self.waitForMessages( isLastExpectedMessage)
265         print "Received %d messages" % len(messages)
266         self.assertEquals(4,
267                           len(messages),
268                           "Did not receive the expected number of messages")
269
270     #
271     # Currently this does not get logged, so we expect to only see the log
272     # entries for the underlying ldap bind.
273     #
274     def test_ldap_change_password_bad_user(self):
275         def isLastExpectedMessage( msg):
276             return (msg["type"] == "Authorization" and
277                     msg["Authorization"]["serviceDescription"]
278                         == "LDAP" and
279                     msg["Authorization"]["authType"] == "krb5")
280
281         new_password = samba.generate_random_password(32,32)
282         try:
283             self.ldb.modify_ldif(
284                 "dn: cn=" + "badUser" + ",cn=users," + self.base_dn + "\n" +
285                 "changetype: modify\n" +
286                 "delete: userPassword\n" +
287                 "userPassword: " + USER_PASS + "\n" +
288                 "add: userPassword\n" +
289                 "userPassword: " + new_password + "\n"
290             )
291             self.fail()
292         except LdbError, (num, msg):
293             pass
294
295         messages = self.waitForMessages( isLastExpectedMessage)
296         print "Received %d messages" % len(messages)
297         self.assertEquals(3,
298                           len(messages),
299                           "Did not receive the expected number of messages")
300
301     def test_ldap_change_password_bad_original_password(self):
302         def isLastExpectedMessage( msg):
303             return (msg["type"] == "Authentication" and
304                     msg["Authentication"]["status"]
305                         == "NT_STATUS_WRONG_PASSWORD" and
306                     msg["Authentication"]["serviceDescription"]
307                         == "LDAP Password Change" and
308                     msg["Authentication"]["authDescription"]
309                         == "LDAP Modify")
310
311         new_password = samba.generate_random_password(32,32)
312         try:
313             self.ldb.modify_ldif(
314                 "dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
315                 "changetype: modify\n" +
316                 "delete: userPassword\n" +
317                 "userPassword: " + "badPassword" + "\n" +
318                 "add: userPassword\n" +
319                 "userPassword: " + new_password + "\n"
320             )
321             self.fail()
322         except LdbError, (num, msg):
323             pass
324
325         messages = self.waitForMessages( isLastExpectedMessage)
326         print "Received %d messages" % len(messages)
327         self.assertEquals(4,
328                           len(messages),
329                           "Did not receive the expected number of messages")