1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
23 from samba.dcerpc.messaging import MSG_GROUP_LOG, DSDB_GROUP_EVENT_NAME
24 from samba.dcerpc.windows_event_ids import (
25 EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
26 EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP
28 from samba.samdb import SamDB
29 from samba.auth import system_session
31 from samba.tests.audit_log_base import AuditLogTestBase
32 from samba.tests import delete_force
34 from ldb import FLAG_MOD_REPLACE
36 USER_NAME = "grpadttstuser01"
37 USER_PASS = samba.generate_random_password(32, 32)
39 SECOND_USER_NAME = "grpadttstuser02"
40 SECOND_USER_PASS = samba.generate_random_password(32, 32)
42 GROUP_NAME_01 = "group-audit-01"
43 GROUP_NAME_02 = "group-audit-02"
46 class GroupAuditTests(AuditLogTestBase):
49 self.message_type = MSG_GROUP_LOG
50 self.event_type = DSDB_GROUP_EVENT_NAME
51 super(GroupAuditTests, self).setUp()
53 self.remoteAddress = os.environ["CLIENT_IP"]
54 self.server_ip = os.environ["SERVER_IP"]
56 host = "ldap://%s" % os.environ["SERVER"]
57 self.ldb = SamDB(url=host,
58 session_info=system_session(),
59 credentials=self.get_credentials(),
60 lp=self.get_loadparm())
61 self.server = os.environ["SERVER"]
63 # Gets back the basedn
64 self.base_dn = self.ldb.domain_dn()
66 # Get the old "dSHeuristics" if it was set
67 dsheuristics = self.ldb.get_dsheuristics()
69 # Set the "dSHeuristics" to activate the correct "userPassword"
71 self.ldb.set_dsheuristics("000000001")
73 # Reset the "dSHeuristics" as they were before
74 self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
76 # Get the old "minPwdAge"
77 minPwdAge = self.ldb.get_minPwdAge()
79 # Set it temporarily to "0"
80 self.ldb.set_minPwdAge("0")
81 self.base_dn = self.ldb.domain_dn()
83 # Reset the "minPwdAge" as it was before
84 self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
86 # (Re)adds the test user USER_NAME with password USER_PASS
88 "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
89 "objectclass": "user",
90 "sAMAccountName": USER_NAME,
91 "userPassword": USER_PASS
93 self.ldb.newgroup(GROUP_NAME_01)
94 self.ldb.newgroup(GROUP_NAME_02)
97 super(GroupAuditTests, self).tearDown()
98 delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
99 self.ldb.deletegroup(GROUP_NAME_01)
100 self.ldb.deletegroup(GROUP_NAME_02)
102 def test_add_and_remove_users_from_group(self):
105 # Wait for the primary group change for the created user.
107 messages = self.waitForMessages(2)
108 print("Received %d messages" % len(messages))
111 "Did not receive the expected number of messages")
112 audit = messages[0]["groupChange"]
114 self.assertEqual("PrimaryGroup", audit["action"])
115 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
116 group_dn = "cn=domain users,cn=users," + self.base_dn
117 self.assertTrue(user_dn.lower(), audit["user"].lower())
118 self.assertTrue(group_dn.lower(), audit["group"].lower())
119 self.assertRegexpMatches(audit["remoteAddress"],
121 self.assertTrue(self.is_guid(audit["sessionId"]))
122 session_id = self.get_session()
123 self.assertEquals(session_id, audit["sessionId"])
124 service_description = self.get_service_description()
125 self.assertEquals(service_description, "LDAP")
127 # Check the Add message for the new users primary group
128 audit = messages[1]["groupChange"]
130 self.assertEqual("Added", audit["action"])
131 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
132 group_dn = "cn=domain users,cn=users," + self.base_dn
133 self.assertTrue(user_dn.lower(), audit["user"].lower())
134 self.assertTrue(group_dn.lower(), audit["group"].lower())
135 self.assertRegexpMatches(audit["remoteAddress"],
137 self.assertTrue(self.is_guid(audit["sessionId"]))
138 session_id = self.get_session()
139 self.assertEquals(session_id, audit["sessionId"])
140 self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
143 # Add the user to a group
145 self.discardMessages()
147 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
148 messages = self.waitForMessages(1)
149 print("Received %d messages" % len(messages))
152 "Did not receive the expected number of messages")
153 audit = messages[0]["groupChange"]
155 self.assertEqual("Added", audit["action"])
156 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
157 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
158 self.assertTrue(user_dn.lower(), audit["user"].lower())
159 self.assertTrue(group_dn.lower(), audit["group"].lower())
160 self.assertRegexpMatches(audit["remoteAddress"],
162 self.assertTrue(self.is_guid(audit["sessionId"]))
163 session_id = self.get_session()
164 self.assertEquals(session_id, audit["sessionId"])
165 service_description = self.get_service_description()
166 self.assertEquals(service_description, "LDAP")
169 # Add the user to another group
171 self.discardMessages()
172 self.ldb.add_remove_group_members(GROUP_NAME_02, [USER_NAME])
174 messages = self.waitForMessages(1)
175 print("Received %d messages" % len(messages))
178 "Did not receive the expected number of messages")
179 audit = messages[0]["groupChange"]
181 self.assertEqual("Added", audit["action"])
182 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
183 group_dn = "cn=" + GROUP_NAME_02 + ",cn=users," + self.base_dn
184 self.assertTrue(user_dn.lower(), audit["user"].lower())
185 self.assertTrue(group_dn.lower(), audit["group"].lower())
186 self.assertRegexpMatches(audit["remoteAddress"],
188 self.assertTrue(self.is_guid(audit["sessionId"]))
189 session_id = self.get_session()
190 self.assertEquals(session_id, audit["sessionId"])
191 service_description = self.get_service_description()
192 self.assertEquals(service_description, "LDAP")
195 # Remove the user from a group
197 self.discardMessages()
198 self.ldb.add_remove_group_members(
201 add_members_operation=False)
202 messages = self.waitForMessages(1)
203 print("Received %d messages" % len(messages))
206 "Did not receive the expected number of messages")
207 audit = messages[0]["groupChange"]
209 self.assertEqual("Removed", audit["action"])
210 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
211 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
212 self.assertTrue(user_dn.lower(), audit["user"].lower())
213 self.assertTrue(group_dn.lower(), audit["group"].lower())
214 self.assertRegexpMatches(audit["remoteAddress"],
216 self.assertTrue(self.is_guid(audit["sessionId"]))
217 session_id = self.get_session()
218 self.assertEquals(session_id, audit["sessionId"])
219 service_description = self.get_service_description()
220 self.assertEquals(service_description, "LDAP")
223 # Re-add the user to a group
225 self.discardMessages()
226 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
228 messages = self.waitForMessages(1)
229 print("Received %d messages" % len(messages))
232 "Did not receive the expected number of messages")
233 audit = messages[0]["groupChange"]
235 self.assertEqual("Added", audit["action"])
236 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
237 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
238 self.assertTrue(user_dn.lower(), audit["user"].lower())
239 self.assertTrue(group_dn.lower(), audit["group"].lower())
240 self.assertRegexpMatches(audit["remoteAddress"],
242 self.assertTrue(self.is_guid(audit["sessionId"]))
243 session_id = self.get_session()
244 self.assertEquals(session_id, audit["sessionId"])
245 service_description = self.get_service_description()
246 self.assertEquals(service_description, "LDAP")
248 def test_change_primary_group(self):
251 # Wait for the primary group change for the created user.
253 messages = self.waitForMessages(2)
254 print("Received %d messages" % len(messages))
257 "Did not receive the expected number of messages")
259 # Check the PrimaryGroup message
260 audit = messages[0]["groupChange"]
262 self.assertEqual("PrimaryGroup", audit["action"])
263 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
264 group_dn = "cn=domain users,cn=users," + self.base_dn
265 self.assertTrue(user_dn.lower(), audit["user"].lower())
266 self.assertTrue(group_dn.lower(), audit["group"].lower())
267 self.assertRegexpMatches(audit["remoteAddress"],
269 self.assertTrue(self.is_guid(audit["sessionId"]))
270 session_id = self.get_session()
271 self.assertEquals(session_id, audit["sessionId"])
272 service_description = self.get_service_description()
273 self.assertEquals(service_description, "LDAP")
275 # Check the Add message for the new users primary group
276 audit = messages[1]["groupChange"]
278 self.assertEqual("Added", audit["action"])
279 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
280 group_dn = "cn=domain users,cn=users," + self.base_dn
281 self.assertTrue(user_dn.lower(), audit["user"].lower())
282 self.assertTrue(group_dn.lower(), audit["group"].lower())
283 self.assertRegexpMatches(audit["remoteAddress"],
285 self.assertTrue(self.is_guid(audit["sessionId"]))
286 session_id = self.get_session()
287 self.assertEquals(session_id, audit["sessionId"])
288 self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
292 # Add the user to a group, the user needs to be a member of a group
293 # before there primary group can be set to that group.
295 self.discardMessages()
297 self.ldb.add_remove_group_members(GROUP_NAME_01, [USER_NAME])
298 messages = self.waitForMessages(1)
299 print("Received %d messages" % len(messages))
302 "Did not receive the expected number of messages")
303 audit = messages[0]["groupChange"]
305 self.assertEqual("Added", audit["action"])
306 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
307 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
308 self.assertTrue(user_dn.lower(), audit["user"].lower())
309 self.assertTrue(group_dn.lower(), audit["group"].lower())
310 self.assertRegexpMatches(audit["remoteAddress"],
312 self.assertTrue(self.is_guid(audit["sessionId"]))
313 session_id = self.get_session()
314 self.assertEquals(session_id, audit["sessionId"])
315 service_description = self.get_service_description()
316 self.assertEquals(service_description, "LDAP")
317 self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
321 # Change the primary group of a user
323 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
324 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
325 # get the primaryGroupToken of the group
326 res = self.ldb.search(base=group_dn, attrs=["primaryGroupToken"],
327 scope=ldb.SCOPE_BASE)
328 group_id = res[0]["primaryGroupToken"]
330 # set primaryGroupID attribute of the user to that group
332 m.dn = ldb.Dn(self.ldb, user_dn)
333 m["primaryGroupID"] = ldb.MessageElement(
337 self.discardMessages()
341 # Wait for the primary group change.
342 # Will see the user removed from the new group
343 # the user added to their old primary group
344 # and a new primary group event.
346 messages = self.waitForMessages(3)
347 print("Received %d messages" % len(messages))
350 "Did not receive the expected number of messages")
352 audit = messages[0]["groupChange"]
353 self.assertEqual("Removed", audit["action"])
354 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
355 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
356 self.assertTrue(user_dn.lower(), audit["user"].lower())
357 self.assertTrue(group_dn.lower(), audit["group"].lower())
358 self.assertRegexpMatches(audit["remoteAddress"],
360 self.assertTrue(self.is_guid(audit["sessionId"]))
361 session_id = self.get_session()
362 self.assertEquals(session_id, audit["sessionId"])
363 service_description = self.get_service_description()
364 self.assertEquals(service_description, "LDAP")
365 self.assertEquals(EVT_ID_USER_REMOVED_FROM_GLOBAL_SEC_GROUP,
368 audit = messages[1]["groupChange"]
370 self.assertEqual("Added", audit["action"])
371 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
372 group_dn = "cn=domain users,cn=users," + self.base_dn
373 self.assertTrue(user_dn.lower(), audit["user"].lower())
374 self.assertTrue(group_dn.lower(), audit["group"].lower())
375 self.assertRegexpMatches(audit["remoteAddress"],
377 self.assertTrue(self.is_guid(audit["sessionId"]))
378 session_id = self.get_session()
379 self.assertEquals(session_id, audit["sessionId"])
380 service_description = self.get_service_description()
381 self.assertEquals(service_description, "LDAP")
382 self.assertEquals(EVT_ID_USER_ADDED_TO_GLOBAL_SEC_GROUP,
385 audit = messages[2]["groupChange"]
387 self.assertEqual("PrimaryGroup", audit["action"])
388 user_dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
389 group_dn = "cn=" + GROUP_NAME_01 + ",cn=users," + self.base_dn
390 self.assertTrue(user_dn.lower(), audit["user"].lower())
391 self.assertTrue(group_dn.lower(), audit["group"].lower())
392 self.assertRegexpMatches(audit["remoteAddress"],
394 self.assertTrue(self.is_guid(audit["sessionId"]))
395 session_id = self.get_session()
396 self.assertEquals(session_id, audit["sessionId"])
397 service_description = self.get_service_description()
398 self.assertEquals(service_description, "LDAP")