dsdb: audit samdb and password changes
[nivanova/samba-autobuild/.git] / python / samba / tests / audit_log_pass_change.py
1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
20 """
21
22 import samba.tests
23 from samba.dcerpc.messaging import MSG_DSDB_PWD_LOG, DSDB_PWD_EVENT_NAME
24 from samba.samdb import SamDB
25 from samba.auth import system_session
26 import os
27 from samba.tests.audit_log_base import AuditLogTestBase
28 from samba.tests import delete_force
29 from samba.net import Net
30 from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
31
32 USER_NAME = "auditlogtestuser"
33 USER_PASS = samba.generate_random_password(32, 32)
34
35 SECOND_USER_NAME = "auditlogtestuser02"
36 SECOND_USER_PASS = samba.generate_random_password(32, 32)
37
38
39 class AuditLogPassChangeTests(AuditLogTestBase):
40
41     def setUp(self):
42         self.message_type = MSG_DSDB_PWD_LOG
43         self.event_type   = DSDB_PWD_EVENT_NAME
44         super(AuditLogPassChangeTests, self).setUp()
45
46         self.remoteAddress = os.environ["CLIENT_IP"]
47         self.server_ip = os.environ["SERVER_IP"]
48
49         host = "ldap://%s" % os.environ["SERVER"]
50         self.ldb = SamDB(url=host,
51                          session_info=system_session(),
52                          credentials=self.get_credentials(),
53                          lp=self.get_loadparm())
54         self.server = os.environ["SERVER"]
55
56         # Gets back the basedn
57         self.base_dn = self.ldb.domain_dn()
58
59         # Get the old "dSHeuristics" if it was set
60         dsheuristics = self.ldb.get_dsheuristics()
61
62         # Set the "dSHeuristics" to activate the correct "userPassword"
63         # behaviour
64         self.ldb.set_dsheuristics("000000001")
65
66         # Reset the "dSHeuristics" as they were before
67         self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
68
69         # Get the old "minPwdAge"
70         minPwdAge = self.ldb.get_minPwdAge()
71
72         # Set it temporarily to "0"
73         self.ldb.set_minPwdAge("0")
74         self.base_dn = self.ldb.domain_dn()
75
76         # Reset the "minPwdAge" as it was before
77         self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
78
79         # (Re)adds the test user USER_NAME with password USER_PASS
80         delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
81         delete_force(
82             self.ldb,
83             "cn=" + SECOND_USER_NAME + ",cn=users," + self.base_dn)
84         self.ldb.add({
85             "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
86             "objectclass": "user",
87             "sAMAccountName": USER_NAME,
88             "userPassword": USER_PASS
89         })
90
91     #
92     # Discard the messages from the setup code
93     #
94     def discardSetupMessages(self, dn):
95         messages = self.waitForMessages(1, dn=dn)
96         self.discardMessages()
97
98     def tearDown(self):
99         super(AuditLogPassChangeTests, self).tearDown()
100
101     def test_net_change_password(self):
102
103         dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
104         self.discardSetupMessages(dn)
105
106         creds = self.insta_creds(template=self.get_credentials())
107
108         lp = self.get_loadparm()
109         net = Net(creds, lp, server=self.server)
110         password = "newPassword!!42"
111
112         net.change_password(newpassword=password.encode('utf-8'),
113                             username=USER_NAME,
114                             oldpassword=USER_PASS)
115
116         messages = self.waitForMessages(1, net, dn)
117         print("Received %d messages" % len(messages))
118         self.assertEquals(1,
119                           len(messages),
120                           "Did not receive the expected number of messages")
121         audit = messages[0]["passwordChange"]
122         self.assertEquals("Change", audit["action"])
123         self.assertEquals(dn, audit["dn"])
124         self.assertRegexpMatches(audit["remoteAddress"],
125                                  self.remoteAddress)
126         session_id = self.get_session()
127         self.assertEquals(session_id, audit["sessionId"])
128         service_description = self.get_service_description()
129         self.assertEquals(service_description, "DCE/RPC")
130         self.assertTrue(self.is_guid(audit["transactionId"]))
131
132     def test_net_set_password_user_without_permission(self):
133
134         dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
135         self.discardSetupMessages(dn)
136
137         self.ldb.newuser(SECOND_USER_NAME, SECOND_USER_PASS)
138
139         #
140         # Get the password reset from the user add
141         #
142         dn = "CN=" + SECOND_USER_NAME + ",CN=Users," + self.base_dn
143         messages = self.waitForMessages(1, dn=dn)
144         print("Received %d messages" % len(messages))
145         self.assertEquals(1,
146                           len(messages),
147                           "Did not receive the expected number of messages")
148
149         audit = messages[0]["passwordChange"]
150         self.assertEquals("Reset", audit["action"])
151         self.assertEquals(dn, audit["dn"])
152         self.assertRegexpMatches(audit["remoteAddress"],
153                                  self.remoteAddress)
154         session_id = self.get_session()
155         self.assertEquals(session_id, audit["sessionId"])
156         service_description = self.get_service_description()
157         self.assertEquals(service_description, "LDAP")
158         self.assertTrue(self.is_guid(audit["transactionId"]))
159         self.assertEquals(0, audit["statusCode"])
160         self.assertEquals("Success", audit["status"])
161         self.discardMessages()
162
163         creds = self.insta_creds(
164             template=self.get_credentials(),
165             username=SECOND_USER_NAME,
166             userpass=SECOND_USER_PASS,
167             kerberos_state=None)
168
169         lp = self.get_loadparm()
170         net = Net(creds, lp, server=self.server)
171         password = "newPassword!!42"
172         domain = lp.get("workgroup")
173
174         try:
175             net.set_password(newpassword=password.encode('utf-8'),
176                              account_name=USER_NAME,
177                              domain_name=domain)
178             self.fail("Expected exception not thrown")
179         except Exception:
180             pass
181
182         dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
183         messages = self.waitForMessages(1, net, dn=dn)
184         print("Received %d messages" % len(messages))
185         self.assertEquals(1,
186                           len(messages),
187                           "Did not receive the expected number of messages")
188
189         audit = messages[0]["passwordChange"]
190         self.assertEquals("Reset", audit["action"])
191         self.assertEquals(dn, audit["dn"])
192         self.assertRegexpMatches(audit["remoteAddress"],
193                                  self.remoteAddress)
194         session_id = self.get_session()
195         self.assertEquals(session_id, audit["sessionId"])
196         service_description = self.get_service_description()
197         self.assertEquals(service_description, "DCE/RPC")
198         self.assertTrue(self.is_guid(audit["transactionId"]))
199         self.assertEquals(ERR_INSUFFICIENT_ACCESS_RIGHTS, audit["statusCode"])
200         self.assertEquals("insufficient access rights", audit["status"])
201
202     def test_net_set_password(self):
203
204         dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
205         self.discardSetupMessages(dn)
206
207         creds = self.insta_creds(template=self.get_credentials())
208
209         lp = self.get_loadparm()
210         net = Net(creds, lp, server=self.server)
211         password = "newPassword!!42"
212         domain = lp.get("workgroup")
213
214         net.set_password(newpassword=password.encode('utf-8'),
215                          account_name=USER_NAME,
216                          domain_name=domain)
217
218         dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
219         messages = self.waitForMessages(1, net, dn)
220         print("Received %d messages" % len(messages))
221         self.assertEquals(1,
222                           len(messages),
223                           "Did not receive the expected number of messages")
224
225         audit = messages[0]["passwordChange"]
226         self.assertEquals("Reset", audit["action"])
227         self.assertEquals(dn, audit["dn"])
228         self.assertRegexpMatches(audit["remoteAddress"],
229                                  self.remoteAddress)
230         session_id = self.get_session()
231         self.assertEquals(session_id, audit["sessionId"])
232         service_description = self.get_service_description()
233         self.assertEquals(service_description, "DCE/RPC")
234         session_id = self.get_session()
235         self.assertEquals(session_id, audit["sessionId"])
236         self.assertTrue(self.is_guid(audit["transactionId"]))
237
238     def test_ldap_change_password(self):
239
240         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
241         self.discardSetupMessages(dn)
242
243         new_password = samba.generate_random_password(32, 32)
244         self.ldb.modify_ldif(
245             "dn: " + dn + "\n" +
246             "changetype: modify\n" +
247             "delete: userPassword\n" +
248             "userPassword: " + USER_PASS + "\n" +
249             "add: userPassword\n" +
250             "userPassword: " + new_password + "\n")
251
252         messages = self.waitForMessages(1, dn=dn)
253         print("Received %d messages" % len(messages))
254         self.assertEquals(1,
255                           len(messages),
256                           "Did not receive the expected number of messages")
257
258         audit = messages[0]["passwordChange"]
259         self.assertEquals("Change", audit["action"])
260         self.assertEquals(dn, audit["dn"])
261         self.assertRegexpMatches(audit["remoteAddress"],
262                                  self.remoteAddress)
263         self.assertTrue(self.is_guid(audit["sessionId"]))
264         session_id = self.get_session()
265         self.assertEquals(session_id, audit["sessionId"])
266         service_description = self.get_service_description()
267         self.assertEquals(service_description, "LDAP")
268         self.assertTrue(self.is_guid(audit["transactionId"]))
269
270     def test_ldap_replace_password(self):
271
272         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
273         self.discardSetupMessages(dn)
274
275         new_password = samba.generate_random_password(32, 32)
276         self.ldb.modify_ldif(
277             "dn: " + dn + "\n" +
278             "changetype: modify\n" +
279             "replace: userPassword\n" +
280             "userPassword: " + new_password + "\n")
281
282         messages = self.waitForMessages(1, dn=dn)
283         print("Received %d messages" % len(messages))
284         self.assertEquals(1,
285                           len(messages),
286                           "Did not receive the expected number of messages")
287
288         audit = messages[0]["passwordChange"]
289         self.assertEquals("Reset", audit["action"])
290         self.assertEquals(dn, audit["dn"])
291         self.assertRegexpMatches(audit["remoteAddress"],
292                                  self.remoteAddress)
293         self.assertTrue(self.is_guid(audit["sessionId"]))
294         session_id = self.get_session()
295         self.assertEquals(session_id, audit["sessionId"])
296         service_description = self.get_service_description()
297         self.assertEquals(service_description, "LDAP")
298         self.assertTrue(self.is_guid(audit["transactionId"]))
299
300     def test_ldap_add_user(self):
301
302         # The setup code adds a user, so we check for the password event
303         # generated by it.
304         dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
305         messages = self.waitForMessages(1, dn=dn)
306         print("Received %d messages" % len(messages))
307         self.assertEquals(1,
308                           len(messages),
309                           "Did not receive the expected number of messages")
310
311         #
312         # The first message should be the reset from the Setup code.
313         #
314         audit = messages[0]["passwordChange"]
315         self.assertEquals("Reset", audit["action"])
316         self.assertEquals(dn, audit["dn"])
317         self.assertRegexpMatches(audit["remoteAddress"],
318                                  self.remoteAddress)
319         session_id = self.get_session()
320         self.assertEquals(session_id, audit["sessionId"])
321         service_description = self.get_service_description()
322         self.assertEquals(service_description, "LDAP")
323         self.assertTrue(self.is_guid(audit["sessionId"]))
324         self.assertTrue(self.is_guid(audit["transactionId"]))